#!/usr/bin/env python """Module many_job. This Module is used to launch a set of bigjobs. """ import sys import getopt import saga import time import pdb import os import traceback import advert_job import logging import many_job """ These are parameters that you want to change for the jobs """ NUMBER_STAGES = 2 NUMBER_JOBS_PER_STAGE = 2 WORK_DIR = "/work/yye00/ICAC/Simulations/" SIMULATION_DIR_SUFFIX = "Simulation_id_" SIMULATION_EXE = "/work/yye00/ICAC/cactus_SAGASandTank" SIMULATION_PAR_FILE_SUFFIX = "/work/yye00/ICAC/ParFiles/ParFile_id_" ENKF_EXE = "/bin/date" CHECK_OUTPUT_CMD = "find ./* -iname *.h5 | xargs h5ls |grep \"BLACKOILBASE::Pw\"" """ Now create the work directories by iterating over the range and creating sub-directories This works fine for LOCAL SYSTEMS ONLY and needs to be revisited for other systems """ for i in range(0, NUMBER_JOBS_PER_STAGE): simdirname = WORK_DIR + SIMULATION_DIR_SUFFIX + "%05d" %i if not os.path.isdir(simdirname+"/"): os.mkdir(simdirname+"/") """ These are the arrays of parameters for the various jobs, you only need the sizes if they are different """ #from numarray import * #JOB_SIZES=ones(NUMBER_JOBS_PER_STAGE) #JOB_SIZES[1:NUMBER_JOBS_PER_STAGE] = 4 JOB_SIZES=[] for i in range(0, NUMBER_JOBS_PER_STAGE): JOB_SIZES.append(4) def has_finished(state): state = state.lower() if state=="done" or state=="failed" or state=="canceled": return True else: return False """ This has been modified with a loop for all stages """ if __name__ == "__main__": # Store the start time and run time for all stages starttime=[] runtime=[] try: for stage in range(0, NUMBER_STAGES): print "Launching " + str(NUMBER_JOBS_PER_STAGE) + " jobs for Stage number " + str(stage) starttime.append(time.time()) # submit via mj abstraction resource_list = ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"}, {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"}) print "Create manyjob service " mjs = many_job.many_job_service(resource_list, None) jobs = [] job_start_times = {} job_states = {} cwd = os.getcwd() for i in range(0, NUMBER_JOBS_PER_STAGE): # create job description jd = saga.job.description() jd.executable = SIMULATION_EXE jd.number_of_processes = str(JOB_SIZES[i]) jd.spmd_variation = "mpi" # toggle between these two statements depending on the parfile directory #jd.arguments = [SIMULATION_PAR_FILE_SUFFIX+"%05d" %i +".par"] jd.arguments=[SIMULATION_PAR_FILE_SUFFIX+"All.par"] jd.working_directory = WORK_DIR + SIMULATION_DIR_SUFFIX + "%05d" %i + "/" jd.output = jd.working_directory + "stdout.txt" jd.error = jd.working_directory + "stderr.txt" subjob = mjs.create_job(jd) subjob.run() print "Submited sub-job " + "%05d"%i + "." jobs.append(subjob) job_start_times[subjob]=time.time() job_states[subjob] = subjob.get_state() print "************************ All Jobs submitted ************************" print "And now we wait for each stage to finish" while 1: finish_counter=0 result_map = {} for i in range(0, NUMBER_JOBS_PER_STAGE): old_state = job_states[jobs[i]] state = jobs[i].get_state() if result_map.has_key(state) == False: result_map[state]=0 result_map[state] = result_map[state]+1 #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state if old_state != state: print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state if old_state != state and has_finished(state)==True: print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s." if has_finished(state)==True: finish_counter = finish_counter + 1 job_states[jobs[i]]=state print "Current states: " + str(result_map) time.sleep(5) if finish_counter == NUMBER_JOBS_PER_STAGE: break runtime.append( time.time()-starttime[stage]) print "Runtime for Stage "+ str(stage)+ " is: " + str(runtime[stage]) + " s; Runtime per Job: " + str(runtime[stage]/NUMBER_JOBS_PER_STAGE) # Now we finished running all the executables, we need to do the checking print "finished with Stage number: " + str(stage) mjs.cancel() except: traceback.print_exc(file=sys.stdout) try: mjs.cancel() except: pass