#ifndef DIGEDAG_SCHEDULER_HPP #define DIGEDAG_SCHEDULER_HPP #include #include #include #include #include "dag.hpp" #include "node.hpp" #include "edge.hpp" #include "util/mutex.hpp" #include "util/thread.hpp" #include "util/scoped_lock.hpp" namespace digedag { class dag; class node; class edge; class enactor; class scheduler : public digedag::util::thread, public util::enable_shared_from_this { private: struct job_info_t { std::string rm; std::string host; std::string pwd; std::string path; }; std::map job_info_; std::string data_src_pwd_; std::string data_tgt_pwd_; std::string data_src_host_; std::string data_tgt_host_; saga::session & session_; dag * dag_; std::string policy_; // scheduling policy bool initialized_; bool stopped_; // queues std::deque > queue_nodes_; std::deque > queue_edges_; std::set active_files_; // see FIXME in task_run boost::shared_ptr enact_nodes_; boost::shared_ptr enact_edges_; int max_nodes_; int max_edges_; int active_nodes_; int active_edges_; std::map > node_task_map_; std::map > edge_task_map_; util::mutex mtx_; void lock (void) { mtx_.lock (); }; void unlock (void) { mtx_.unlock (); }; // list of known nodes and edges, which helps to avoid scheduling them // twice. Its actually only used for nodes right now, as edges get only // fired once anyway. Nodes however can get fired multiple times. std::set known_nodes_; std::set known_edges_; public: scheduler (dag * d, const std::string & policy, saga::session session); ~scheduler (void); void parse_src (void); void stop (void); void thread_work (void); bool hook_dag_create (void); bool hook_dag_destroy (void); bool hook_dag_schedule (void); bool hook_dag_run_pre (void); bool hook_dag_run_post (void); bool hook_dag_run_done (void); bool hook_dag_run_fail (void); bool hook_dag_wait (void); bool hook_node_add (boost::shared_ptr n); bool hook_node_remove (boost::shared_ptr n); bool hook_node_run_pre (boost::shared_ptr n); bool hook_node_run_done (boost::shared_ptr n); bool hook_node_run_fail (boost::shared_ptr n); bool hook_edge_add (boost::shared_ptr e); bool hook_edge_remove (boost::shared_ptr e); bool hook_edge_run_pre (boost::shared_ptr e); bool hook_edge_run_done (boost::shared_ptr e); bool hook_edge_run_fail (boost::shared_ptr e); saga::session hook_saga_get_session (void); void work_finished (saga::task t, std::string flag); void dump_map (const std::map > & map); }; } // namespace digedag #endif // DIGEDAG_SCHEDULER_HPP