/* * saga-cpr-worker.cpp * saga_applications * * Created by luckow on 18.06.08. */ #include #include #include #include #include #include #include "boost/filesystem/operations.hpp" #include "boost/filesystem/path.hpp" #include #define CHECKPOINT_NAME "remd_checkpoint" #define CHECKPOINT_CHK_INTERVALL 300 #define MAX_URL 255 namespace fs = boost::filesystem; void run_application(std::string launch_command); saga::url build_url(fs::path file); bool check_file(fs::path file, std::vector urls); std::string get_hostname(void); void update_checkpoints(std::string checkpoint_dir); void print_registered_checkpoints(); int count_nodes(std::string filename); int main (int argc, char* argv[]) { struct timeval startTime, endTime; double atime; gettimeofday(&startTime, NULL); std::cout<<"start saga-cpr-worker"< 4){ std::cout << "Usage: " << argv[0] << "\"\" "<=3){ job_type = std::string(argv[2]); if(job_type=="MPI"||job_type=="mpi"){ pbs_nodefile.assign(saga::safe_getenv("PBS_NODEFILE")); } } if(argc==4){ checkpoint_dir = std::string(argv[3]); } // Init Migol/Monitoring // uses per default Application Information Service (AIS) configured in // the $SAGA_LOCATON/share/saga/saga_adaptor_migol_cpr.ini // required for all subsequent CPR calls (otherwise exception is thrown) saga::cpr::service js; std::ostringstream command_stream; if (pbs_nodefile!="" && (job_type=="MPI" || job_type=="mpi")){ //mpi command_stream <<"mpirun -machinefile "<< pbs_nodefile << " " ; //number nodes int count = count_nodes(pbs_nodefile); command_stream << " -np " << count << " "; //arguments command_stream << argv[1]; } else { command_stream << argv[1]; } //execute command in separate thread std::string command = command_stream.str(); boost::thread application_thread(TR1::bind(run_application, command)); // This loop monitors the application threads and checks whether new checkpoint // files exist while(!application_thread.timed_join(boost::posix_time::seconds(CHECKPOINT_CHK_INTERVALL))) { std::cout<<"Check for application thread..."< lfns; lfns = chkpt.list_files(); std::cout << "Received files: " <status()) ) { ++file_count; fs::path p = dir_itr->path(); bool exists = check_file(p, lfns); std::cout << "File: " << p << " exists: "<< ((exists==0) ? "false":"true") <<"\n"; if(!exists){ saga::url u = build_url(p); std::cout<< "Create file: " << u <path().leaf() << " " << ex.what() << std::endl; } } } //print_registered_checkpoints(); } /* build gsisftp:// urls for referencing of checkpoint files */ saga::url build_url(fs::path file){ saga::url url("gsiftp://" + get_hostname() + "/" + file.string()); std::cout< files = chkpt.list_files(); std::cout << "Received files: " < urls){ for (unsigned int i = 0; i < urls.size(); i++) { fs::path local_file = urls[i].get_path(); //std::cout << "AIS URL: " + local_file.string() << " Local URL: " << file.string() << std::endl; if(file==local_file){ return true; } } return false; } /** get hostname **/ std::string get_hostname(void) { char buffer[MAX_URL] = { '\0' }; gethostname(buffer, sizeof(buffer)); return std::string(buffer); } /** check whether file exists **/ int count_nodes(std::string filename){ std::ifstream datafile(filename.c_str()); if(!datafile){ std::cerr << "File not found: " << filename << std::endl; return 1; } int count=0; for (std::string line; std::getline(datafile, line);) { count++; } std::cout << "found " << count << " lines."<