// Copyright (c) 2005-2008 Hartmut Kaiser // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #if !defined(ADAPTOR_GRIDSAM_JOBSUBMISSION_HK20070912_0917AM) #define ADAPTOR_GRIDSAM_JOBSUBMISSION_HK20070912_0917AM #include #include #include #include #include #include #include #include #include #include #include #include #include "gsoap_helper.hpp" #include "stubs/gridsam/gridsamJobSubmissionSOAPBindingProxy.h" /////////////////////////////////////////////////////////////////////////////// namespace util { /////////////////////////////////////////////////////////////////////////// class SubmitJobRequest { public: SubmitJobRequest(soap* soap, std::string const& exename) : registry_(soap), request_(registry_.create(soap_new__gridsam__submitJob, soap_delete__gridsam__submitJob)), posix_app_(NULL) { // create and initialize job description instance gridsam__JobDescriptionType* jd = registry_.create( soap_new_gridsam__JobDescriptionType, soap_delete_gridsam__JobDescriptionType); // create and initialize the job definition instance jsdl__JobDefinition_USCOREType* jsdl_def = registry_.create( soap_new_jsdl__JobDefinition_USCOREType, soap_delete_jsdl__JobDefinition_USCOREType); // create JSDL job description instance jsdl__JobDescription_USCOREType* jsdl_jd = registry_.create( soap_new_jsdl__JobDescription_USCOREType, soap_delete_jsdl__JobDescription_USCOREType); // create JSDL resource description instance jsdl__Resources_USCOREType* jsdl_rd = registry_.create( soap_new_jsdl__Resources_USCOREType, soap_delete_jsdl__Resources_USCOREType); // create and initialize the application instance jsdl__Application_USCOREType* jsdl_app = registry_.create( soap_new_jsdl__Application_USCOREType, soap_delete_jsdl__Application_USCOREType); // create and initialize the POSIX application instance posix_app_ = registry_.create( soap_new_jsdlposix__POSIXApplication_USCOREType, soap_delete_jsdlposix__POSIXApplication_USCOREType); // create and initialize the executable file name jsdlposix__FileName_USCOREType* posix_exename = registry_.create( soap_new_jsdlposix__FileName_USCOREType, soap_delete_jsdlposix__FileName_USCOREType); posix_exename->__item = exename; // chain up all created elements posix_app_->jsdlposix__Executable = posix_exename; jsdl_jd->jsdl__Resources = jsdl_rd; jsdl_jd->jsdl__Application = jsdl_app; jsdl_def->jsdl__JobDescription = jsdl_jd; jd->jsdl__JobDefinition = jsdl_def; request_->gridsam__JobDescription.push_back(jd); } ~SubmitJobRequest() {} _gridsam__submitJob* get() { return request_; } // set the arguments in the request void set_arguments(std::vector const& args) { if (args.empty()) return; // create a proper array of arguments std::vector::const_iterator end = args.end(); for (std::vector::const_iterator it = args.begin(); it != end; ++it) { jsdlposix__Argument_USCOREType* arg = registry_.create( soap_new_jsdlposix__Argument_USCOREType, soap_delete_jsdlposix__Argument_USCOREType); arg->__item = *it; posix_app_->jsdlposix__Argument.push_back(arg); } } // set the environment for the new job void set_environment(std::map const& keyval) { if (keyval.empty()) return; // create a proper array of arguments std::map::const_iterator end = keyval.end(); for (std::map::const_iterator it = keyval.begin(); it != end; ++it) { jsdlposix__Environment_USCOREType* env = registry_.create( soap_new_jsdlposix__Environment_USCOREType, soap_delete_jsdlposix__Environment_USCOREType); env->name = (*it).first; env->__item = (*it).second; posix_app_->jsdlposix__Environment.push_back(env); } } // set working directory for the new job void set_working_directory(std::string const& wd) { // create and initialize a new working directory item jsdlposix__DirectoryName_USCOREType* working_dir = registry_.create(soap_new_jsdlposix__DirectoryName_USCOREType, soap_delete_jsdlposix__DirectoryName_USCOREType); working_dir->__item = wd; posix_app_->jsdlposix__WorkingDirectory = working_dir; } // set the names of the redirected input/output files void set_stdin_file(std::string const& name) { jsdlposix__FileName_USCOREType* file = registry_.create( soap_new_jsdlposix__FileName_USCOREType, soap_delete_jsdlposix__FileName_USCOREType); file->__item = name; posix_app_->jsdlposix__Input = file; } void set_stdout_file(std::string const& name) { jsdlposix__FileName_USCOREType* file = registry_.create( soap_new_jsdlposix__FileName_USCOREType, soap_delete_jsdlposix__FileName_USCOREType); file->__item = name; posix_app_->jsdlposix__Output = file; } void set_stderr_file(std::string const& name) { jsdlposix__FileName_USCOREType* file = registry_.create( soap_new_jsdlposix__FileName_USCOREType, soap_delete_jsdlposix__FileName_USCOREType); file->__item = name; posix_app_->jsdlposix__Error = file; } // functionality for setting resource characteristics void set_TotalCPUCount(int count) { jsdl__RangeValue_USCOREType* val = registry_.create( soap_new_jsdl__RangeValue_USCOREType, soap_delete_jsdl__RangeValue_USCOREType); jsdl__Exact_USCOREType* exact = registry_.create( soap_new_jsdl__Exact_USCOREType, soap_delete_jsdl__Exact_USCOREType); exact->__item = count; val->Exact.push_back(exact); get_jsdl_resource_description()->jsdl__TotalCPUCount = val; } void set_TotalCPUTime(int count) { jsdl__RangeValue_USCOREType* val = registry_.create( soap_new_jsdl__RangeValue_USCOREType, soap_delete_jsdl__RangeValue_USCOREType); jsdl__Boundary_USCOREType* upper = registry_.create( soap_new_jsdl__Boundary_USCOREType, soap_delete_jsdl__Boundary_USCOREType); upper->__item = count; val->UpperBoundedRange = upper; get_jsdl_resource_description()->jsdl__TotalCPUTime = val; } void set_TotalPhysicalMemory(double memory) { jsdl__RangeValue_USCOREType* val = registry_.create( soap_new_jsdl__RangeValue_USCOREType, soap_delete_jsdl__RangeValue_USCOREType); jsdl__Boundary_USCOREType* lower = registry_.create( soap_new_jsdl__Boundary_USCOREType, soap_delete_jsdl__Boundary_USCOREType); lower->__item = memory; val->LowerBoundedRange = lower; get_jsdl_resource_description()->jsdl__TotalPhysicalMemory = val; } void set_ProcessCountLimit(int count) { jsdlposix__Limits_USCOREType* val = registry_.create( soap_new_jsdlposix__Limits_USCOREType, soap_delete_jsdlposix__Limits_USCOREType); val->__item = boost::lexical_cast(count); posix_app_->jsdlposix__ProcessCountLimit = val; } void set_ThreadCountLimit(int count) { jsdlposix__Limits_USCOREType* val = registry_.create( soap_new_jsdlposix__Limits_USCOREType, soap_delete_jsdlposix__Limits_USCOREType); val->__item = boost::lexical_cast(count); posix_app_->jsdlposix__ThreadCountLimit = val; } void set_CPUArchitecture(std::string const& cpuarch) { struct soap_code_map { jsdl__ProcessorArchitectureEnumeration code; char const* const string; }; static soap_code_map const processor_architectures[] = { { jsdl__ProcessorArchitectureEnumeration__sparc, saga::job::attributes::cpuarchitecture_sparc }, { jsdl__ProcessorArchitectureEnumeration__powerpc, saga::job::attributes::cpuarchitecture_powerpc }, { jsdl__ProcessorArchitectureEnumeration__x86, saga::job::attributes::cpuarchitecture_x86 }, { jsdl__ProcessorArchitectureEnumeration__x86_USCORE32, saga::job::attributes::cpuarchitecture_x86_32 }, { jsdl__ProcessorArchitectureEnumeration__x86_USCORE64, saga::job::attributes::cpuarchitecture_x86_64 }, { jsdl__ProcessorArchitectureEnumeration__parisc, saga::job::attributes::cpuarchitecture_parisc }, { jsdl__ProcessorArchitectureEnumeration__mips, saga::job::attributes::cpuarchitecture_mips }, { jsdl__ProcessorArchitectureEnumeration__ia64, saga::job::attributes::cpuarchitecture_ia64 }, { jsdl__ProcessorArchitectureEnumeration__arm, saga::job::attributes::cpuarchitecture_arm }, { jsdl__ProcessorArchitectureEnumeration__other, saga::job::attributes::cpuarchitecture_other }, { (jsdl__ProcessorArchitectureEnumeration)-1, NULL } }; int i = 0; for (/**/; NULL != processor_architectures[i].string; ++i) { if (cpuarch == processor_architectures[i].string) break; } if (NULL == processor_architectures[i].string) { throw std::runtime_error("SubmitJobRequest::set_CPUArchitecture: " "couldn't match processor architecture name to a valid " "enumerator value: '" + cpuarch + "'"); } jsdl__CPUArchitecture_USCOREType* val = registry_.create( soap_new_jsdl__CPUArchitecture_USCOREType, soap_delete_jsdl__CPUArchitecture_USCOREType); val->jsdl__CPUArchitectureName = processor_architectures[i].code; get_jsdl_resource_description()->jsdl__CPUArchitecture = val; } void add_StageInStep(std::string const& source, std::string const& target, bool overwrite) { jsdl__JobDescription_USCOREType* jd = get_jsdl_job_description(); jsdl__DataStaging_USCOREType* stage = registry_.create( soap_new_jsdl__DataStaging_USCOREType, soap_delete_jsdl__DataStaging_USCOREType); saga::url u(source); stage->jsdl__FileName = u.get_path(); stage->jsdl__CreationFlag = overwrite ? jsdl__CreationFlagEnumeration__overwrite : jsdl__CreationFlagEnumeration__append; stage->jsdl__Source = registry_.create( soap_new_jsdl__SourceTarget_USCOREType, soap_delete_jsdl__SourceTarget_USCOREType); stage->jsdl__Source->jsdl__URI = registry_.create(target); jd->jsdl__DataStaging.push_back(stage); } void add_StageOutStep(std::string const& source, std::string const& target, bool overwrite) { jsdl__JobDescription_USCOREType* jd = get_jsdl_job_description(); jsdl__DataStaging_USCOREType* stage = registry_.create( soap_new_jsdl__DataStaging_USCOREType, soap_delete_jsdl__DataStaging_USCOREType); saga::url u (target); stage->jsdl__FileName = u.get_path(); stage->jsdl__CreationFlag = overwrite ? jsdl__CreationFlagEnumeration__overwrite : jsdl__CreationFlagEnumeration__append; stage->jsdl__Target = registry_.create( soap_new_jsdl__SourceTarget_USCOREType, soap_delete_jsdl__SourceTarget_USCOREType); stage->jsdl__Target->jsdl__URI = registry_.create(source); jd->jsdl__DataStaging.push_back(stage); } // serialize the POSIXApplication description void serialize() { get_jsdl_app()->__any.clear(); get_jsdl_app()->__any.push_back( serialize_to_xml(registry_, posix_app_, "jsdlposix:POSIXApplication")); } protected: gridsam__JobDescriptionType* get_job_description(int i = 0) const { return request_->gridsam__JobDescription[i]; } jsdl__JobDescription_USCOREType* get_jsdl_job_description() const { return get_job_description()-> jsdl__JobDefinition->jsdl__JobDescription; } jsdl__Application_USCOREType* get_jsdl_app() const { return get_jsdl_job_description()->jsdl__Application; } jsdl__Resources_USCOREType* get_jsdl_resource_description() const { return get_jsdl_job_description()->jsdl__Resources; } private: soap_registry registry_; _gridsam__submitJob* request_; jsdlposix__POSIXApplication_USCOREType* posix_app_; }; /////////////////////////////////////////////////////////////////////////// class SubmitJobResponse { public: SubmitJobResponse(soap* soap) : registry_(soap), response_(registry_.create(soap_new__gridsam__submitJobResponse, soap_delete__gridsam__submitJobResponse)) { } ~SubmitJobResponse() {} _gridsam__submitJobResponse* get() { return response_; } // return the job-id of the submitted job std::string get_job_id() const { return response_->gridsam__JobIdentifier[0]->ID; } private: soap_registry registry_; _gridsam__submitJobResponse* response_; }; /////////////////////////////////////////////////////////////////////////////// } // namespace util /////////////////////////////////////////////////////////////////////////////// class JobSubmission : public JobSubmissionSOAPBindingProxy { private: typedef JobSubmissionSOAPBindingProxy base_type; JobSubmission* this_() { return this; } public: JobSubmission(saga::impl::v1_0::cpi* cpi, std::string const& endpoint, std::list const& ctxs, std::string const& exename) : endpoint_(endpoint), request_(this_(), exename), response_(this_()) { this->soap_endpoint = endpoint_.c_str(); saga::impl::exception_list exceptions; std::list::const_iterator end = ctxs.end(); for (std::list::const_iterator it = ctxs.begin(); it != end; ++it) { std::string certs, usercert, userkey, userpass; certs = retrieve_attribute(*it, saga::attributes::context_certrepository); usercert = retrieve_attribute(*it, saga::attributes::context_usercert); userkey = retrieve_attribute(*it, saga::attributes::context_userkey); userpass = retrieve_attribute(*it, saga::attributes::context_userpass); try { util::connect_to_gridsam(cpi, this, certs, usercert, userkey, userpass); } catch (saga::adaptors::exception const& e) { exceptions.add(e); } } if (exceptions.get_error_count()) { SAGA_ADAPTOR_THROW_PLAIN(cpi, exceptions.get_message(), exceptions.get_error()) } } ~JobSubmission() {} // initialize different parts of the request void set_arguments(std::vector const& args) { request_.set_arguments(args); } void set_environment(std::map const& env) { request_.set_environment(env); } void set_working_directory(std::string const& wd) { request_.set_working_directory(wd); } void set_stdin_file(std::string const& name) { request_.set_stdin_file(name); } void set_stdout_file(std::string const& name) { request_.set_stdout_file(name); } void set_stderr_file(std::string const& name) { request_.set_stderr_file(name); } void set_ProcessCountLimit(int count) { request_.set_ProcessCountLimit(count); } void set_ThreadCountLimit(int count) { request_.set_ThreadCountLimit(count); } void set_TotalCPUCount(int count) { request_.set_TotalCPUCount(count); } void set_TotalCPUTime(int count) { request_.set_TotalCPUTime(count); } void set_TotalPhysicalMemory(double count) { request_.set_TotalPhysicalMemory(count); } void set_CPUArchitecture(std::string const& cpuarch) { request_.set_CPUArchitecture(cpuarch); } void add_StageInStep(std::string const& source, std::string const& target, bool overwrite) { request_.add_StageInStep(source, target, overwrite); } void add_StageOutStep(std::string const& source, std::string const& target, bool overwrite) { request_.add_StageOutStep(source, target, overwrite); } // do the submission int submitJob(std::string& job_id) { request_.serialize(); int result = base_type::submitJob(request_.get(), response_.get()); if (SOAP_OK == result) job_id = response_.get_job_id(); return result; } std::string error() { char buffer[512] = { '\0' }; soap_sprint_fault(this, buffer, sizeof(buffer)); return buffer; } private: std::string endpoint_; util::SubmitJobRequest request_; util::SubmitJobResponse response_; }; #endif