Package examples :: Module example_local_multiple
[hide private]
[frames] | no frames]

Source Code for Module examples.example_local_multiple

  1  """ Example application demonstrating job submission via bigjob  
  2   
  3      DON'T EDIT THIS FILE (UNLESS THERE IS A BUG) 
  4       
  5      THIS FILE SHOULD NOT BE COMMITTED TO SVN WITH USER-SPECIFIC PATHS! 
  6  """ 
  7   
  8  import os 
  9  import time 
 10  import pdb 
 11  # BigJob implementation can be swapped here by importing another implementation, 
 12  # e.g. condor, cloud, azure 
 13  import sys 
 14  sys.path.insert(0, os.getcwd() + "/../") 
 15   
 16  # configuration 
 17  """ This variable defines the coordination system that is used by BigJob 
 18      e.g.  
 19          advert://localhost (SAGA/Advert SQLITE) 
 20          advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL) 
 21          advert://advert.cct.lsu.edu:5432 (SAGA/Advert POSTGRESQL) 
 22          redis://localhost:6379 (Redis at localhost) 
 23          tcp://localhost (ZMQ) 
 24          tcp://* (ZMQ - listening to all interfaces) 
 25  """ 
 26   
 27  ### EDIT COORDINATION_URL to point to advert server.   
 28  #COORDINATION_URL = "advert://advert.cct.lsu.edu:5432/" 
 29  #COORDINATION_URL = "advert://advert.cct.lsu.edu:8080/" 
 30  COORDINATION_URL = "advert://localhost/?dbtype=sqlite3" 
 31   
 32  from bigjob import bigjob, subjob, description 
 33   
 34   
 35  ### EDIT based on the number of jobs you want to submit 
 36  NUMBER_JOBS=8 
 37   
38 -def has_finished(state):
39 state = state.lower() 40 if state=="done" or state=="failed" or state=="canceled": 41 return True 42 else: 43 return False
44 45 46 """ Test Job Submission via Advert """ 47 if __name__ == "__main__": 48 49 starttime=time.time() 50 51 ########################################################################################## 52 # Edit parameters for BigJob 53 queue=None # if None default queue is used 54 project=None # if None default allocation is used 55 walltime=100 56 processes_per_node=4 57 number_of_processes =2 58 workingdirectory= os.path.join(os.getcwd(), "agent") # working directory for agent 59 userproxy = None # userproxy (not supported yet due to context issue w/ SAGA) 60 61 62 """ 63 URL of the SAGA Job Service that is used to dispatch the pilot job. 64 The following URLs are accepted: 65 66 lrms_url = "gram://oliver1.loni.org/jobmanager-pbs" # globus resource url used when globus is used. (LONI) 67 lrms_url = "pbspro://louie1.loni.org" # pbspro resource url used when pbspro scheduling system is used.(Futuregrid or LSU Machines) 68 lrms_url = "ssh://louie1.loni.org" # ssh resource url which launches jobs on target machine. Jobs not submitted to scheduling system. 69 lrms_url = "pbs-ssh://louie1.loni.org" # Submit jobs to scheduling system of remote machine. 70 lrms_url = "xt5torque://localhost" # torque resource url. 71 72 Please ensure that the respective SAGA adaptor is installed and working 73 """ 74 lrms_url = "fork://localhost" 75 76 ########################################################################################## 77 78 print "Start Pilot Job/BigJob at: " + lrms_url 79 bj = bigjob(COORDINATION_URL) 80 bj.start_pilot_job( lrms_url, 81 None, 82 number_of_processes, 83 queue, 84 project, 85 workingdirectory, 86 userproxy, 87 walltime, 88 processes_per_node) 89 90 print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + str(bj.get_state()) 91 92 ########################################################################################## 93 # Submit SubJob through BigJob 94 jobs = [] 95 job_start_times = {} 96 job_states = {} 97 for i in range(0, NUMBER_JOBS): 98 jd = description() 99 jd.executable = "/bin/date" 100 jd.number_of_processes = "1" 101 jd.spmd_variation = "single" 102 jd.arguments = [""] 103 jd.output = "sj-stdout-"+str(i)+".txt" 104 jd.error = "sj-stderr-"+str(i)+".txt" 105 106 sj = subjob() 107 sj.submit_job(bj.pilot_url, jd) 108 jobs.append(sj) 109 job_start_times[sj]=time.time() 110 job_states[sj] = sj.get_state() 111 112 # busy wait for completion 113 while 1: 114 finish_counter=0 115 result_map = {} 116 for i in range(0, NUMBER_JOBS): 117 old_state = job_states[jobs[i]] 118 state = jobs[i].get_state() 119 #print "Job " + str(jobs[i]) + " state: " + state 120 if result_map.has_key(state)==False: 121 result_map[state]=1 122 else: 123 result_map[state] = result_map[state]+1 124 #pdb.set_trace() 125 if old_state != state: 126 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state 127 if old_state != state and has_finished(state)==True: 128 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s." 129 if has_finished(state)==True: 130 finish_counter = finish_counter + 1 131 job_states[jobs[i]]=state 132 133 if finish_counter == NUMBER_JOBS: 134 break 135 time.sleep(2) 136 137 runtime = time.time()-starttime 138 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS) 139 ########################################################################################## 140 # Cleanup - stop BigJob 141 bj.cancel() 142 #time.sleep(30) 143