#include #include #include #include #include "job_starter.hpp" ////////////////////////////////////////////////////////////////////// job_starter::job_starter (std::string a_dir, mb_util::ini::ini ini) : ini_ (ini) { // first, initialize all endpoints according to the ini file. // the main ini section MAY contain a key 'backend_ini' which // then points to an external ini file which contains the detailed // backend ini sections. If that key is not present, these sections // are (backwards compatibly) assumed to be in the main ini file. mb_util::ini::section cfg = ini_.get_section ("mandelbrot"); mb_util::ini::section ep_ini = cfg; if ( ! cfg.get_entry ("backend_ini", "").empty () ) { ini_.merge (cfg.get_entry ("backend_ini")); } mb_util::ini::section ep_cfg = ep_ini.get_section ("backends"); mb_util::ini::entry_map backends = ep_cfg.get_entries (); mb_util::ini::entry_map :: iterator it; debug_ = ::atoi (cfg.get_entry ("debug", "0").c_str ()); for ( it = backends.begin (); it != backends.end (); it++ ) { std::string key = (*it).first; std::string val = (*it).second; if ( val == "yes" ) { mb_util::ini::section backend_config = ep_cfg.get_section (key); backend_config.add_entry ("debug", cfg.get_entry ("debug" , "0")); std::string url = backend_config.get_entry ("url" , ""); std::cout << "creating endpoint " << key << " \t ... " << std::flush; boost::shared_ptr ep (new endpoint (key, backend_config)); if ( ep->valid_ ) { endpoints_.push_back (ep); std::cout << "ok (" << url << ")" << std::endl; } else { std::cout << "failed (" << url << ")" << std::endl; } } } // for each endpoint, we run 'njobs' jobs. for ( unsigned int e = 0; e < endpoints_.size (); e++ ) { // try the next endpoint boost::shared_ptr ep = endpoints_[e]; if ( ep->valid_ ) { for ( unsigned int j = 0; j < ep->njobs_; j++ ) { try { int jobnum = clients_.size (); std::string ident = boost::lexical_cast (jobnum); // create a job description saga::job::description jd; jd.set_attribute (saga::job::attributes::description_executable, ep->exe_); // client parameters: // 0: path to advert directory to be used (job bucket) // 1: jobnum, == name of work bucket for that job (is that in loop later) std::vector args; args.push_back ("mandelbrot_client "); args.push_back (a_dir); args.push_back (ident); // append ep args std::vector epargs = saga::adaptors::utils::split (ep->args_, ' '); for ( unsigned int a = 0; a < epargs.size (); a++ ) { args.push_back (epargs[a]); } jd.set_vector_attribute (saga::job::attributes::description_arguments, args); if ( ! ep->pwd_.empty () ) { jd.set_attribute (saga::job::attributes::description_working_directory, ep->pwd_); } #if 0 // let the clients store stdout/stderr to /tmp/mandelbrot_client.[id].out/err // FIXME: this should get enabled once the bes adaptor supports it, and // is able to stage the output files back into the pwd { std::string out; std::string err; out += "/tmp/mandelbrot_client." + ident + ".out"; err += "/tmp/mandelbrot_client." + ident + ".err"; jd.set_attribute (saga::job::attributes::description_output, out); jd.set_attribute (saga::job::attributes::description_error, err); } #endif ep->cnt_jreq_++; std::cout << "starting job " << ident << " on " << ep->name_ << " \t ... " << std::flush; saga::job::job j = ep->service_.create_job (jd); j.run (); if ( saga::job::Running != j.get_state () ) { std::cout << "failure - could not run " << ep->exe_ << " " << ep->args_ << std::endl; j.cancel (); // clean up resources ep->log_ << "job spawning failed [1]: " << ep->exe_ << " " << ep->args_ << "\n"; // do not use this job } else { // keep job (wrapped in client) boost::shared_ptr c (new client (ident, j, ep)); clients_.push_back (c); client_map_[c->id_] = c; // store full jobid in ep log ep->cnt_jrun_++; std::cout << "ok " << c->id_short_ << std::endl; ep->log_ << "spawned client " << jobnum << " on " << ep->name_ << ": " << c->id_short_ << "\n"; } if ( debug_ ) { std::cout << " command : " << ep->exe_; for ( unsigned int i = 0; i < args.size (); i++ ) { std::cout << " " << args[i]; } std::cout << std::endl; } } catch ( const saga::exception & e ) { std::cout << "failure - could not start exe " << ep->exe_ << " " << ep->args_ << std::endl; ep->log_ << "job spawning failed [2]: " << ep->exe_ << " " << ep->args_ << "\n" << e.what () << "\n\n"; } catch ( const std::exception & e ) { std::cout << "failure - could not start exe " << ep->exe_ << " " << ep->args_ << std::endl; ep->log_ << "job spawning failed [2]: " << ep->exe_ << " " << ep->args_ << "\n" << e.what () << "\n\n"; } catch ( char const * s ) { std::cout << "failure - could not start exe " << ep->exe_ << " " << ep->args_ << std::endl; ep->log_ << "job spawning failed [2]: " << ep->exe_ << " " << ep->args_ << "\n" << s << "\n\n"; } catch ( ... ) { std::cout << "failure - could not start exe " << ep->exe_ << " " << ep->args_ << std::endl; ep->log_ << "job spawning failed [2]: " << ep->exe_ << " " << ep->args_ << "\n"; } } } } if ( clients_.size () == 0 ) { throw "Could not start any jobs!"; } } job_starter::~job_starter (void) { for ( unsigned int e = 0; e < endpoints_.size (); e++ ) { std::cout << "closing endpoint " << e << std::endl; endpoints_[e]->cancel (); } }