Package examples :: Module example_manyjob_local
[hide private]
[frames] | no frames]

Source Code for Module examples.example_manyjob_local

  1  #!/usr/bin/env python 
  2   
  3  """ 
  4  Dynamic BigJob (ManyJob) Example 
  5   
  6  This Module is used to launch a set of bigjobs. 
  7   
  8  DON'T EDIT THIS FILE (UNLESS THERE IS A BUG) 
  9       
 10  THIS FILE SHOULD NOT BE COMMITTED TO SVN WITH USE-SPECIFIC PATHS! 
 11   
 12  """ 
 13   
 14  import sys 
 15  import getopt 
 16  import time 
 17  import pdb 
 18  import os 
 19  import traceback 
 20  import logging 
 21   
 22   
 23  # BigJob implementation can be swapped here by importing another implementation, 
 24  # e.g. condor, cloud, azure 
 25  import sys 
 26   
 27  sys.path.insert(0, os.getcwd() + "/../") 
 28   
 29  from bigjob import bigjob, subjob, description 
 30  from bigjob_dynamic.many_job import * 
 31   
 32   
 33  """ This variable defines the coordination system that is used by BigJob 
 34      e.g.  
 35          advert://localhost (SAGA/Advert SQLITE) 
 36          advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL) 
 37          redis://localhost:6379 (Redis at localhost) 
 38          tcp://localhost (ZMQ) 
 39  """ 
 40  COORDINATION_URL = "advert://localhost" 
 41  #COORDINATION_URL = "advert://advert.cct.lsu.edu:8080" 
 42       
 43  NUMBER_JOBS=8 
 44   
45 -def has_finished(state):
46 state = state.lower() 47 if state=="done" or state=="failed" or state=="canceled": 48 return True 49 else: 50 return False
51
52 -def main():
53 try: 54 print "ManyJob load test with " + str(NUMBER_JOBS) + " jobs." 55 starttime=time.time() 56 57 """ submit via mj abstraction 58 59 resource_list.append( {"resource_url" : "gram://eric1.loni.org/jobmanager-pbs", "processes_per_node":"4", 60 "number_of_processes" : "4", "allocation" : None, "queue" : "workq", 61 "working_directory": (os.getcwd() + "/agent"), "walltime":10 }) 62 63 """ 64 resource_list = [] 65 resource_dictionary = {"resource_url" : "fork://localhost/", "number_of_processes" : "32", 66 "processes_per_node":"1", "allocation" : None, "queue" : None, 67 "working_directory": (os.getcwd() + "/agent"), "walltime":3600 } 68 resource_list.append(resource_dictionary) 69 70 71 #Flags for controlling dynamic BigJob 72 add_additional_resources=True 73 remove_additional_resources=False 74 75 76 print "Create Dynamic BigJob Service " 77 mjs = many_job_service(resource_list, COORDINATION_URL) 78 79 jobs = [] 80 job_start_times = {} 81 job_states = {} 82 cwd = os.getcwd() 83 for i in range(0, NUMBER_JOBS): 84 # create job description 85 jd = description() 86 jd.executable = "/bin/date" 87 jd.number_of_processes = "1" 88 jd.spmd_variation = "single" 89 jd.arguments = [""] 90 jd.working_directory = os.getcwd(); 91 jd.output = "stdout-" + str(i) + ".txt" 92 jd.error = "stderr-" + str(i) + ".txt" 93 subjob = mjs.create_job(jd) 94 subjob.run() 95 print "Submited sub-job " + "%d"%i + "." 96 jobs.append(subjob) 97 job_start_times[subjob]=time.time() 98 job_states[subjob] = subjob.get_state() 99 print "************************ All Jobs submitted ************************" 100 while 1: 101 finish_counter=0 102 result_map = {} 103 for i in range(0, NUMBER_JOBS): 104 old_state = job_states[jobs[i]] 105 state = jobs[i].get_state() 106 if result_map.has_key(state) == False: 107 result_map[state]=0 108 result_map[state] = result_map[state]+1 109 #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state 110 if old_state != state: 111 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state 112 if old_state != state and has_finished(state)==True: 113 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s." 114 if has_finished(state)==True: 115 finish_counter = finish_counter + 1 116 job_states[jobs[i]]=state 117 118 # Dynamic BigJob add resources at runtime 119 # if more than 30 s - add additional resource 120 if time.time()-starttime > 10 and add_additional_resources==True: 121 print "***add additional resources***" 122 mjs.add_resource(resource_dictionary) 123 add_additional_resources=False 124 125 # remove resources from dynamic bigjob 126 if (time.time()-starttime > 15 and remove_additional_resources==True): 127 bj_list = mjs.get_resources() 128 if len(bj_list)>0: 129 print "***remove resources: " + str(bj_list[0]) 130 mjs.remove_resource(bj_list[0]) 131 remove_additional_resources=False 132 133 print "Current states: " + str(result_map) 134 time.sleep(5) 135 if finish_counter == NUMBER_JOBS: 136 break 137 138 mjs.cancel() 139 runtime = time.time()-starttime 140 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS) 141 except: 142 traceback.print_exc(file=sys.stdout) 143 try: 144 mjs.cancel() 145 except: 146 pass
147 148 """ Test Job Submission via ManyJob abstraction """ 149 if __name__ == "__main__": 150 main() 151