// Copyright (c) 2005-2007 Hartmut Kaiser // Copyright (c) 2005-2007 Andre Merzky (andre@merzky.net) // // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE or copy at // http://www.boost.org/LICENSE_1_0.txt) #include #ifndef SAGA_SKIP_CONFIG_HPP # include "../config/config.hpp" #endif #ifndef SAGA_WINDOWS // C includes #include #include #include #include #include #ifdef SAGA_APPLE #include #define environ (*_NSGetEnviron()) #elif !defined(BOOST_WINDOWS) extern char **environ; #endif #ifndef MAX_PATH # define MAX_PATH _POSIX_PATH_MAX #endif // saga includes #include // saga adaptor icnludes #include #include #include // saga engine includes #include #include // saga package includes // saga package includes #include #include #include // adaptor includes #include "posix_job.hpp" #include "posix_job_iostream.hpp" // constructor job_cpi_impl::job_cpi_impl (proxy * p, cpi_info const & info, saga::ini::ini const & glob_ini, saga::ini::ini const & adap_ini, TR1::shared_ptr adaptor) : base_cpi (p, info, adaptor, cpi::Noflags), session_ (p->get_session ()) { log ("posix job c'tor"); instance_data idata (this); adaptor_data_type adata (this); // check if we have a rm url. If yes, check if we are asked for. if ( ! idata->rm_.get_string ().empty () ) { // initialize our service URL url_ = idata->rm_; // check if URL is usable if ( ! url_.get_host ().empty () && url_.get_host () != "localhost" && url_.get_host () != adata->localhost () ) { SAGA_OSSTREAM strm; strm << "Could not initialize job for [" << idata->rm_ << "]. " << "Only 'localhost' and " << adata->localhost ()<< " are supported."; SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(strm), saga::BadParameter); } if ( ! url_.get_scheme ().empty () && url_.get_scheme () != "fork" && url_.get_scheme () != "any" ) { SAGA_OSSTREAM strm; strm << "Could not initialize job for [" << idata->rm_ << "]. " << "Only any:// and fork:// schemes are supported."; SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(strm), saga::BadParameter); } } else { // if we don't have an URL at all, we accept } log ("posix job ctor 1"); if ( idata->init_from_jobid_ ) { log ("posix job ctor from jobid"); adaptor_data_type adata (this); jobid_ = idata->jobid_; pid_ = jobid2pid (jobid_); log ("init from jobid", jobid_); // try to find the job from ps (list) adata->find_known_jobs (); default_job_adaptor::known_job_list_type::iterator begin = adata->known_job_list_.begin (); default_job_adaptor::known_job_list_type::iterator end = adata->known_job_list_.end (); default_job_adaptor::known_job_list_type::iterator iter; struct default_job_adaptor::known_job_type known_job; bool found = false; for ( iter = begin; ! found && iter != end; ++iter) { if ( pid_ == iter->pid ) { known_job.uid = iter->uid; known_job.pid = iter->pid; known_job.exe = iter->exe; known_job.args = iter->args; found = true; } } if ( ! found ) { log ("posix job ctor ! from jobid"); SAGA_ADAPTOR_THROW ("could not find job for jobid", saga::BadParameter); } log ("posix job ctor from jobid ok"); // fill job description std::vector chosts; { adaptor_data_type adata (this); chosts.push_back (adata->localhost ()); } jd_.set_attribute (saga::job::attributes::description_executable, known_job.exe); jd_.set_attribute (saga::job::attributes::description_interactive, saga::attributes::common_false); jd_.set_vector_attribute (saga::job::attributes::description_arguments, known_job.args); jd_.set_vector_attribute (saga::job::attributes::description_candidate_hosts, chosts); // known_job.dump (); if ( pid_ != getpid () ) { log ("posix job ctor ! self"); is_self_ = false; } else { log ("posix job ctor self"); // we are a job::self - fill some more into the job description is_self_ = true; // get cwd char a_cwd[MAX_PATH]; if ( NULL != getcwd (a_cwd, MAX_PATH) ) { jd_.set_attribute (saga::job::attributes::description_working_directory, a_cwd); } // get env std::vector a_env; char ** ep; for ( ep = environ; *ep != NULL; ++ep ) { std::string tmp (*ep); a_env.push_back (tmp); } jd_.set_vector_attribute (saga::job::attributes::description_environment, a_env); // we don't support interactive self's,as we very likely did not spawn // ourself ;-) jd_.set_attribute (saga::job::attributes::description_interactive, saga::attributes::common_false); log ("posix job ctor self ok"); } // we inited from job id, and created job description - store it idata->jd_ = jd_; log ("posix job ctor from jobid ok"); } else { log ("posix job ctor ! from jobid"); // we don't init from jobid, this we init from job description // check if we can run on the candidate hosts if ( ! jd_is_local (jd_) ) { SAGA_ADAPTOR_THROW ("Cannot submit to CandidateHosts.", saga::BadParameter); } // save the job description jd_ = idata->jd_; } // set some default jd attribs interactive_ = jd_is_interactive (jd_); log ("interactive", interactive_); // create a new impl job. Note that this job may have io channels, but they // are not connected as of yet. No jobid is available before run(). impl_ = new impl::posix_job (jd_); log ("posix job ctor got impl"); if ( NULL == impl_ ) { SAGA_ADAPTOR_THROW ("could not instanciate posix job impl", saga::NoSuccess); } impl_->set_jd (jd_); impl_->set_jobid (pid_); if ( idata->init_from_jobid_ ) { // set state to running (as we found the job in PS, we assume its running) // FIXME: could very well be sleeping, or Zombie... // impl_->set_state (saga::job::Running); } // FIXME: register metrics etc. log ("posix job ctor done"); } // destructor job_cpi_impl::~job_cpi_impl (void) { log ("posix job d'tor"); if ( NULL != impl_ ) { delete (impl_); } } // SAGA API functions // FIXME: how is an attribute_get on state triggereing a get_state() // and an respective state update? void job_cpi_impl::sync_get_state (saga::job::state & ret) { ret = impl_->get_state (); } void job_cpi_impl::sync_get_description (saga::job::description & ret) { log ("getting description"); ret = jd_; } void job_cpi_impl::sync_get_job_id (std::string & ret) { ret = jobid_; } // access streams for communication with the child void job_cpi_impl::sync_get_stdin (saga::job::ostream & ret) { if ( ! interactive_ ) { SAGA_ADAPTOR_THROW ("job is not interactive", saga::IncorrectState); } posix_job_adaptor_ostream tmp (this, impl_->get_stdout ()); ret = tmp; } void job_cpi_impl::sync_get_stdout (saga::job::istream & ret) { if ( ! interactive_ ) { SAGA_ADAPTOR_THROW ("job is not interactive", saga::IncorrectState); } log (" = stdout", impl_->get_stdout ()); posix_job_adaptor_istream tmp (this, impl_->get_stdout ()); ret = tmp; } void job_cpi_impl::sync_get_stderr (saga::job::istream & ret) { if ( ! interactive_ ) { SAGA_ADAPTOR_THROW ("job is not interactive", saga::IncorrectState); } posix_job_adaptor_istream tmp (this, impl_->get_stderr ()); ret = tmp; } // we assume that sending a SIGUSR1 triggers an application level // checkpoint. The signal num should be an configurable adaptor // option (FIXME). void job_cpi_impl::sync_checkpoint (saga::impl::void_t & ret) { impl_->signal (SIGUSR1); } void job_cpi_impl::sync_migrate (saga::impl::void_t & ret, saga::job::description jd) { // migrate just forks a new version of myself... // can only migrate to localhost if ( ! jd_is_local (jd) ) { SAGA_ADAPTOR_THROW ("Cannot submit to CandidateHosts.", saga::BadParameter); } // migrated jobs are NEVER interactive if ( jd_is_interactive (jd) ) { SAGA_ADAPTOR_THROW ("Cannot migrate interactive jobs.", saga::BadParameter); } saga::job::job job = saga::adaptors::job ("fork://localhost", jd, session_); job.run (); // this is the original process - the migrated one is up, so we have // actually two instances at the moment. We should be able to safely shut // off the old one now. #ifdef MIGRATE_KILLS log ("killing self after migrate"); impl_->cancel (); // oops, suicide failed, still alive? Hmm, thats a bug! SAGA_ADAPTOR_THROW ("could not finish self after spawning new instance.", saga::NoSuccess); #else // FIXME: for now, dayinlife relies on the migrate not to kill, but to // simply return... log ("! killing self after migrate"); #endif return; } void job_cpi_impl::sync_signal (saga::impl::void_t & ret, int signal) { impl_->signal (signal); } // suspend the child process void job_cpi_impl::sync_suspend (saga::impl::void_t & ret) { impl_->suspend (); } // resume the child process void job_cpi_impl::sync_resume (saga::impl::void_t & ret) { impl_->resume (); } ////////////////////////////////////////////////////////////////////// // inherited from the task interface void job_cpi_impl::sync_run (saga::impl::void_t & ret) { // retrieve jd, check it, and run. instance_data idata (this); adaptor_data_type adata (this); if ( ! idata->jd_is_valid_ ) { SAGA_ADAPTOR_THROW ("Job description cannot be retrieved.", saga::NotImplemented); } // retrieve and check saga job description for this host jd_ = idata->jd_; // set jd for impl_ impl_->set_jd (jd_); if ( jd_.attribute_exists (saga::job::attributes::description_executable) ) { std::string exe = jd_.get_attribute (saga::job::attributes::description_executable); struct ::stat stat_buf; int ret = ::stat (exe.c_str (), &stat_buf); if ( -1 == ret && ENOENT == errno ) { SAGA_ADAPTOR_THROW ("executable does not exist", saga::BadParameter); } } if ( impl_->get_state () != saga::job::New ) { SAGA_ADAPTOR_THROW ("run can only be called on New jobs.", saga::IncorrectState); } impl_->run (); if ( impl_->get_state () == saga::job::Running || impl_->get_state () == saga::job::Done || impl_->get_state () == saga::job::Failed ) { // // run was successful it seems, so we set the 'Created' attribute saga::adaptors::attribute jobattr (this); std::time_t current = 0; std::time (¤t); jobattr.set_attribute (saga::job::attributes::created, ctime (¤t)); } // create the jobid { adaptor_data_type adata (this); jobid_ = adata->pid2jobid (impl_->get_jobid ()); } saga::adaptors::attribute attr (this); attr.set_attribute (saga::job::attributes::jobid, jobid_); } void job_cpi_impl::sync_cancel (saga::impl::void_t & ret, double timeout) { impl_->cancel (); } // wait for the child process to terminate // FIXME: should we use alarm(2) ? void job_cpi_impl::sync_wait (bool & ret, double timeout) { // delay for nanosleep in sec # define WAIT_DELAY 0.01 double time = 0; ret = false; struct timespec delay; struct timespec remain; delay.tv_sec = 0; delay.tv_nsec = (long int) (WAIT_DELAY * 1000000000); // < 10^9 bool blocking = false; if ( 0 > timeout ) { blocking = true; } while ( blocking || (time < timeout) ) { if ( saga::job::Done == impl_->get_state () || saga::job::Failed == impl_->get_state () || saga::job::Canceled == impl_->get_state () ) { ret = true; break; } while ( -1 == ::nanosleep (&delay, &remain) ) { if ( EINTR == errno ) { // interrupted by signal - sleep again delay = remain; } else { // some other error! FIXME: strerror SAGA_ADAPTOR_THROW ("nanosleep failed.", saga::NoSuccess); } } time += WAIT_DELAY; } } ////////////////////////////////////////////////////////////////////// pid_t job_cpi_impl::jobid2pid (std::string jobid) { size_t start = 0; size_t end = 0; std::string pid_str; pid_t pid; start = jobid.find_first_of ("[", 2); end = jobid.find_first_of ("]", start); if ( end != std::string::npos) { pid_str = jobid.substr (start + 1, end - start - 1); } std::istringstream pid_strstream (pid_str); pid = atoi (pid_str.c_str ()); return (pid); } bool job_cpi_impl::jd_is_local (saga::job::description & jd) { bool can_run = false; if ( jd.attribute_exists (saga::job::attributes::description_candidate_hosts) ) { adaptor_data_type adata (this); std::vector chosts = jd.get_vector_attribute (saga::job::attributes::description_candidate_hosts); for ( unsigned int i = 0; i < chosts.size (); i++ ) { if ( "localhost" == chosts[i] || adata->localhost () == chosts[i] ) { can_run = true; } } } else { can_run = true; } return can_run; } bool job_cpi_impl::jd_is_interactive (saga::job::description & jd) { if ( jd.attribute_exists (saga::job::attributes::description_interactive) ) { if ( saga::attributes::common_false == jd.get_attribute (saga::job::attributes::description_interactive) ) { return false; } else { return true; } } jd.set_attribute (saga::job::attributes::description_interactive, saga::attributes::common_false); return false; } #endif // SAGA_WINDOWS