Package examples :: Module user_bigjob
[hide private]
[frames] | no frames]

Source Code for Module examples.user_bigjob

  1  #!/usr/bin/env python 
  2   
  3   
  4  ############################################################# 
  5  # SAGA ManyJob  
  6  ############################################################# 
  7   
  8   
  9  print("############################################################") 
 10  print("Make sure you are running this example on head node") 
 11  print("############################################################") 
 12   
 13  # import required packages and check for their availability 
 14  try: 
 15      import sys 
 16      import getopt 
 17      import saga 
 18      import time 
 19      import pdb 
 20      import os 
 21      import traceback 
 22      import logging 
 23      import sys 
 24  except: 
 25      print("Error: Libraries are missing. Please check SAGA Installation, SAGA Python bindings and other imported packages") 
 26      print(" Error :Please launch python; import saga; if it works then python bindings & saga installation is successful") 
 27      sys.exit(0); 
 28   
 29  ############################################################# 
 30  #  Set BigJob Path 
 31  ############################################################# 
 32  # for running BJ from local dir 
 33  sys.path.insert(0, os.getcwd() + "/../") 
 34  try: 
 35      from bigjob.bigjob_manager import * 
 36      from bigjob_dynamic.many_job import * 
 37  except: 
 38      print("Error: SAGA BigJob & ManyJob modules are not found") 
 39   
 40  ############################################################# 
 41  # Set number of jobs 
 42  ############################################################# 
 43   
 44  NUMBER_JOBS=8 
 45   
 46  ############################################################# 
 47  # Grid certificate validation 
 48  ############################################################# 
 49  grid_validity=os.system("globus-job-run localhost /bin/date > /dev/null 2>&1") 
 50  if grid_validity != 0: 
 51      print(" Grid proxy not initated. Please use grid-proxy-init to initate grid proxy") 
 52      
 53   
 54  ############################################################ 
 55  # Functions used to determine state of jobs 
 56  ############################################################ 
 57   
58 -def has_finished(state):
59 state = state.lower() 60 if state=="done" or state=="failed" or state=="canceled": 61 return True 62 else: 63 return False
64 65 66 67 if __name__ == "__main__": 68 try: 69 70 print " Number of jobs submitted " + str(NUMBER_JOBS) + " jobs." 71 starttime=time.time() 72 73 resource_list = [] 74 #resource_list.append( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "64", "allocation" : "<your allocation>", "queue" : "workq", "bigjob_agent": (os.getcwd() + "/bigjob_agent_launcher.sh") , "working_directory": (os.getcwd() + "/agent"), "walltime":10 }) 75 #resource_list.append({"resource_url" : "gram://oliver1.loni.org/jobmanager-pbs", "number_nodes" : "1", "processes_per_node":"4", "allocation" : None, "queue" : None, "bigjob_agent": (BIGJOB_HOME + "/bigjob_agent_launcher.sh"), "working_directory": (os.getcwd() + "/agent"), "walltime":30 }) 76 #resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs", "number_nodes" : "2", "processes_per_node":"4", "allocation" : None, "queue" : "checkpt", "bigjob_agent": (BIGJOB_HOME + "/bigjob_agent_launcher.sh"), "working_directory": (os.getcwd() + "/agent"), "walltime":4320 }) 77 78 # resource_url - Machine on which you want to run the job. 79 # number_of_processes - total number of processes requested on that machine. 80 # processes_per_node 81 # allocation - if None then default allocation is used. 82 # queue - if None then default queue is used. 83 84 resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs", 85 "number_of_processes" : "4", "processes_per_node":"4", 86 "allocation" : None, "queue" : "checkpt", 87 "working_directory": (os.getcwd() + "/agent"), 88 "walltime":20 }) 89 90 resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs", 91 "number_of_processes" : "4", "processes_per_node":"4", 92 "allocation" : None, "queue" : "checkpt", 93 "working_directory": (os.getcwd() + "/agent"), 94 "walltime":20 }) 95 96 97 print "Create manyjob service " 98 mjs = many_job_service(resource_list, COORDINATION_URL) 99 jobs = [] 100 job_start_times = {} 101 job_states = {} 102 job_ids={} 103 cwd = os.getcwd() 104 105 106 for i in range(0, NUMBER_JOBS): 107 jd = saga.job.description() 108 jd.executable = "/bin/date" 109 jd.number_of_processes = "1" 110 jd.spmd_variation = "single" 111 jd.arguments = [""] 112 jd.working_directory = os.getcwd() 113 jd.output = "stdout-" + str(i) + ".txt" 114 jd.error = "stderr-" + str(i) + ".txt" 115 subjob = mjs.create_job(jd) 116 subjob.run() 117 print "Submited sub-job " + str(i) + "." 118 jobs.append(subjob) 119 job_start_times[subjob]=time.time() 120 job_states[subjob] = subjob.get_state() 121 122 123 print "************************ All Jobs submitted ************************" 124 while 1: 125 finish_counter=0 126 result_map = {} 127 print("JOB_ID Status Machine Job_Attributes") 128 print("==========================================================") 129 for i in range(0, NUMBER_JOBS): 130 old_state = job_states[jobs[i]] 131 state = jobs[i].get_state() 132 if result_map.has_key(state) == False: 133 result_map[state]=0 134 result_map[state] = result_map[state]+1 135 k = str(jobs[i]) 136 if k is not "None": 137 jobid = (k.split(":"))[4] 138 machine = (k.split(":"))[2] 139 exe = jobs[i].get_exe() 140 arguments = jobs[i].get_arguments() 141 strings = str(jobid) + " " + "%7s %10s %10s,%6s" % (str(state),str(machine),str(exe), str(arguments)) 142 print(strings) 143 ##print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state 144 #if old_state != state: 145 #print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state 146 #if old_state != state and has_finished(state)==True: 147 #print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s." 148 if has_finished(state)==True: 149 finish_counter = finish_counter + 1 150 job_states[jobs[i]]=state 151 152 print "Current states: " + str(result_map) 153 time.sleep(5) 154 if finish_counter == NUMBER_JOBS: 155 break 156 157 print(" All Jobs completed. Cleaning up the bigjob..." ) 158 mjs.cancel() 159 runtime = time.time()-starttime 160 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS) 161 except: 162 traceback.print_exc(file=sys.stdout) 163 try: 164 k=0 165 mjs.cancel() 166 except: 167 pass 168