#include #include #include #include "util/split.hpp" #include "node.hpp" #include "scheduler.hpp" namespace digedag { node::node (node_description & nd, std::string name, boost::shared_ptr scheduler, saga::session session) : nd_ ( nd) , rm_ ( "") , name_ ( name) , state_ ( Incomplete) , is_void_ ( false) , valid_task_ ( false) , task_ ( saga::task::New) , scheduler_ ( scheduler) , session_ ( session) { std::stringstream ss; if ( ! nd.attribute_exists (node_attributes::executable) ) { throw ("Cannot handle node w/o executable"); } ss << nd.get_attribute (node_attributes::executable); if ( nd.attribute_exists (node_attributes::arguments) ) { std::vector args = nd.get_vector_attribute (node_attributes::arguments); for ( unsigned int i = 0; i < args.size (); i++ ) { ss << " " << args[i]; } cmd_ = ss.str (); } } node::node (std::string cmd, std::string name, boost::shared_ptr scheduler, saga::session session) : rm_ ( "") , cmd_ ( cmd) , name_ ( name) , state_ ( Incomplete) , is_void_ ( false) , valid_task_ ( false) , task_ ( saga::task::New) , scheduler_ ( scheduler) , session_ ( session) { // parse cmd into node description std::vector elems = split (cmd_); nd_.set_attribute (node_attributes::executable, elems[0]); nd_.set_attribute (node_attributes::interactive, saga::attributes::common_false); elems.erase (elems.begin ()); nd_.set_vector_attribute (node_attributes::arguments, elems); } node::node (boost::shared_ptr scheduler, saga::session session) : rm_ ( "") , cmd_ ( "-") , name_ ( "void") , state_ ( Incomplete) , is_void_ ( true) , valid_task_ ( false) , task_ ( saga::task::New) , scheduler_ ( scheduler) , session_ ( session) { } node::~node (void) { std::cout << " === node destructed" << std::endl; } void node::set_name (const std::string name) { name_ = name; } void node::add_edge_in (boost::shared_ptr e) { edge_in_.push_back (e); // set initial input edge state edge_states_[e->get_name ()] = e->get_state (); } void node::add_edge_out (boost::shared_ptr e) { edge_out_.push_back (e); } void node::sync_state (bool verbose) { for (unsigned int i=0; iget_name() << " / " << edge_in_[i]->get_state() << " internal state: " << edge_states_[edge_in_[i]->get_name()] << std::endl << "node::sync_edge_state: syncing internal node edge_states_ " << edge_states_[edge_in_[i]->get_name()] << " = " << edge_in_[i]->get_state() << std::endl; } edge_states_[edge_in_[i]->get_name()] = edge_in_[i]->get_state(); } set_state(Done); for (unsigned int i=0; iget_state() != Done) set_state(Incomplete); } } void node::dryrun (void) { // check if all input data are ready std::map :: iterator begin = edge_states_.begin (); std::map :: iterator end = edge_states_.end (); std::map :: iterator it; for ( it = begin; it != end; it++ ) { if ( Done != it->second ) { return; } } // update state get_state (); if ( Pending != state_ ) return; std::cout << std::string (" node : ") << get_name () << " \t -> " << cmd_ << std::endl; state_ = Done; for ( unsigned int i = 0; i < edge_out_.size (); i++ ) { edge_out_[i]->dryrun (); } } void node::reset (void) { state_ = Incomplete; for ( unsigned int i = 0; i < edge_out_.size (); i++ ) { edge_out_[i]->reset (); } } // this call starts the sequence which will eventually run the node's job // (application). The edge parameter points to the edge which actually fired // this node, so that the node can keep track of the state for its incoming // edges. Only if all those edges are Done, fire will actually *do* anything. // // Nodes can also be fired by the dag itself (thisnk INPUT node). A void edge // is then given as parameter. void node::fire (boost::shared_ptr e) { if ( state_ == Stopped ) return; // std::cout << std::string (" === node ") << get_name () << ": " // << e->get_name () << " fired me: " << state_to_string (e->get_state ()) // << std::endl; // store state of firing edge edge_states_[e->get_name ()] = Done; // proceed with the normal fire procedure fire (); } void node::fire (void) { // update own state - we got fired by an edge most likely, and state needs // to be updated for that edge at least. get_state (); // Check if node is indeed ready to go if ( Pending != state_ ) { std::cout << std::string (" === node ") << get_name () << ": " << state_to_string (state_) << std::endl; return; } // all input edges are Done, i.e. all input data are available. We // can thus really execute the node application. // // ### scheduler hook - leave it to the scheduler to call our work routine boost::shared_ptr me = shared_from_this (); assert (me); if ( scheduler_->hook_node_run_pre (me) ) { std::cout << std::string (" === node ") << get_name () << " [" << state_to_string (state_) << "] " << " was accepted by scheduler" << std::endl; } else { std::cout << std::string (" === node ") << get_name () << " [" << state_to_string (state_) << "] " << " was declined by scheduler" << std::endl; } } saga::task node::work_start (void) { if ( state_ == Stopped ) return task_; assert ( state_ == Pending ); // we have work to do, and scheduler lets us go ahead. We are finally // Running state_ = Running; if ( is_void_ ) { // fake a noop task, which does nothing: simply returnh the empty // Done task... task_ = saga::task (saga::task::Done); std::cout << " === node run : " << get_name () << " (void): " << state_to_string (state_) << " - " << task_.get_id () << std::endl; } else { assert ( nd_.attribute_exists ("Executable") ); try { saga::job::description jd (nd_); // jd.set_attribute (saga::job::attributes::description_working_directory, "/tmp/0/"); // jd.set_attribute (saga::job::attributes::description_interactive, "true"); // jd.set_attribute (saga::job::attributes::description_input, "/dev/null"); // jd.set_attribute (saga::job::attributes::description_output, // std::string ("/tmp/out.") + get_id ()); // jd.set_attribute (saga::job::attributes::description_error, // std::string ("/tmp/err.") + get_id ()); saga::job::service js (session_, rm_); saga::job::job j = js.create_job (jd); j.run (); task_ = j; std::cout << " === node run : " << get_name () << " (void): " << state_to_string (state_) << " - " << task_.get_id () << std::endl; } catch ( const saga::exception & e ) { std::cout << " === running node " << get_name () << " failed: \n" << e.what () << std::endl; state_ = Failed; task_ = saga::task (saga::task::Failed); } } valid_task_ = true; return task_; } void node::work_done (void) { if ( state_ == Stopped ) return; // we don't assert here for state != Done and state_ != Failed, as // get_state may have set these states meanwhile, looking at the job // state. state_ = Done; std::cout << std::string (" === node done: ") << get_name () << " (" << get_cmd () << ") " << std::endl; if ( state_ == Done ) { // get data staged out, e.g. fire outgoing edges for ( unsigned int i = 0; i < edge_out_.size (); i++ ) { std::cout << " === node " << get_id () << " fires edge " << edge_out_[i]->get_name () << std::endl; edge_out_[i]->fire (shared_from_this ()); } } // ### scheduler hook scheduler_->hook_node_run_done (shared_from_this ()); return; } void node::work_failed (void) { if ( state_ == Stopped ) return; assert ( state_ != Failed ); assert ( state_ != Done ); try { if ( task_.get_state () == saga::task::Failed ) { // task_.rethrow (); saga::no_success e (task_, "job failed for unknown reason"); throw e; } } catch ( const saga::exception & e ) { std::cout << " === node " << get_name () << " set to failed by scheduler: \n" << e.what () << std::endl; } state_ = Failed; } void node::stop () { if ( task_.get_state () == saga::task::Running ) { task_.cancel (); } state_ = Stopped; } void node::dump (void) { std::cout << " node " << get_name () << " [" << host_ << ":" << pwd_ << " : " << cmd_ << "]" << " (" << state_to_string (get_state ()) << ")" << std::endl; for ( unsigned int i = 0; i < edge_in_.size (); i++ ) { edge_in_[i]->dump (); } } std::string node::get_id (void) const { return name_; } std::string node::get_name (void) const { return name_; } node_description node::get_description (void) const { return nd_; } void node::set_state (state s) { state_ = s; } state node::return_state (void) { return state_; } // FIXME: it is not nice to have such fundamental side effects in get_state! // That code needs to eventually move into a callback on the job state metric. state node::get_state (void) { // std::cout << " === node " << get_name () << " : state before check " << state_to_string (state_) << std::endl; // final states just return if ( state_ == Stopped || state_ == Done || state_ == Failed ) { // std::cout << " === node " << get_name () << " is in final state" << std::endl; return state_; } if ( state_ == Incomplete ) { // check if any input data failed std::map :: iterator begin = edge_states_.begin (); std::map :: iterator end = edge_states_.end (); std::map :: iterator it; for ( it = begin; it != end; it++ ) { if ( Failed == it->second ) { // std::cout << " !!! node " << get_name () << " failed due to edge " << it->first << std::endl; state_ = Failed; return state_; } } // check if all input data are ready for ( it = begin; it != end; it++ ) { if ( Done != it->second ) { // std::cout << " === node " << get_name () << " : input '" // << it->first << "' is missing: " << state_to_string (it->second) << std::endl; state_ = Incomplete; return state_; } } // no dep failed, all Done - we are pending! // std::cout << " === node " << get_name () // << " : no input missing: now Pending!" << std::endl; state_ = Pending; } // we can only depend from the job state if a job was // actually created if ( valid_task_ ) { std::cout << " === node " << get_name () << " : checking task state" << std::endl; switch ( task_.get_state () ) { case saga::job::New: std::cout << " === node is almost running: " << get_name () << std::endl; state_ = Pending; break; case saga::job::Running: // std::cout << " === node is running: " << get_name () << std::endl; state_ = Running; break; case saga::job::Done: state_ = Done; scheduler_->hook_node_run_done (shared_from_this ()); break; // Canceled, Failed, Unknown, New - all invalid default: state_ = Failed; std::cout << std::string (" node ") << get_name () << " : job failed - cancel: " << cmd_ << std::endl; // ### scheduler hook scheduler_->hook_node_run_fail (shared_from_this ()); break; } // switch } // std::cout << " === node " << get_name () << " : state after check " << state_to_string (state_) << std::endl; return state_; } void node::set_pwd (std::string pwd) { pwd_ = pwd; saga::url u_pwd (pwd_); nd_.set_attribute (node_attributes::working_directory, u_pwd.get_path ()); // set pwd for all incoming and outgoing edges for ( unsigned int i = 0; i < edge_in_.size (); i++ ) { edge_in_[i]->set_pwd_tgt (pwd); } for ( unsigned int i = 0; i < edge_out_.size (); i++ ) { edge_out_[i]->set_pwd_src (pwd); } } void node::set_rm (std::string rm) { // std::cout << " === setting rm to " << rm << std::endl; rm_ = rm; } void node::set_host (std::string host) { host_ = host; // std::cout << " === setting host to " << host << std::endl; std::vector chosts; chosts.push_back (host); nd_.set_vector_attribute (node_attributes::candidate_hosts, chosts); // set host for all incoming and outgoing edges for ( unsigned int i = 0; i < edge_in_.size (); i++ ) { edge_in_[i]->set_host_tgt (host); } for ( unsigned int i = 0; i < edge_out_.size (); i++ ) { edge_out_[i]->set_host_src (host); } } void node::set_path (std::string path) { path_ = path; std::vector new_env; if ( nd_.attribute_exists (node_attributes::environment) ) { std::vector old_env = nd_.get_vector_attribute (node_attributes::environment); bool found = false; for ( unsigned int i = 0; i < old_env.size (); i++ ) { std::vector words = split (old_env[i], "="); if ( words[0] == "PATH" ) { if ( path.empty () ) { new_env.push_back (old_env[i]); } else { if ( words.size () == 1 ) { new_env.push_back (words[0] + "=" + path); } { new_env.push_back (words[0] + "=" + words[1] + ":" + path); } } found = true; } else { // not PATH new_env.push_back (old_env[i]); } } if ( ! found ) { if ( ! path.empty () ) { new_env.push_back (std::string ("PATH=") + path); } } } else { // no env at all if ( ! path.empty () ) { new_env.push_back (std::string ("PATH=") + path); } } // replace env nd_.set_vector_attribute (node_attributes::environment, new_env); // FIXME: condor adaptor does not evaluate path. Thus, we set the // executable name to absolute file name // Note that this mechanism required rm_ to be set first if ( nd_.attribute_exists (node_attributes::executable) && saga::url (rm_).get_scheme () == "condor" ) { if ( ! path.empty () ) { nd_.set_attribute (node_attributes::executable, path + "/" + nd_.get_attribute (node_attributes::executable)); } } } std::string node::get_cmd (void) { if ( is_void_ ) { return "void"; } std::string out (nd_.get_attribute ("Executable")); if ( nd_.attribute_exists ("Arguments") ) { std::vector args = nd_.get_vector_attribute ("Arguments"); for ( unsigned int i = 0; i < args.size (); i++ ) { out += " " + args[i]; } } return out; } } // namespace digedag