Package examples :: Module example_manyjob_affinity
[hide private]
[frames] | no frames]

Source Code for Module examples.example_manyjob_affinity

  1  #!/usr/bin/env python 
  2   
  3  """many_job example with affinity. 
  4   
  5  This Module is used to launch a set of bigjobs. 
  6   
  7  """ 
  8  import getopt 
  9  import time 
 10  import pdb 
 11  import os 
 12  import traceback 
 13  import logging 
 14  import sys 
 15   
 16   
 17  sys.path.insert(0, os.getcwd() + "/../") 
 18   
 19  # Big Job Imports 
 20  from bigjob import bigjob, subjob, description 
 21  from bigjob_dynamic.many_job_affinity import * 
 22   
 23   
 24  """ This variable defines the coordination system that is used by BigJob 
 25      e.g.  
 26          advert://localhost (SAGA/Advert SQLITE) 
 27          advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL) 
 28          redis://localhost:6379 (Redis at localhost) 
 29          tcp://localhost (ZMQ) 
 30  """ 
 31  COORDINATION_URL = "advert://localhost/?dbtype=sqlite3" 
 32   
 33  NUMBER_JOBS=8 
 34   
 35   
36 -def has_finished(state):
37 state = state.lower() 38 if state=="done" or state=="failed" or state=="canceled": 39 return True 40 else: 41 return False
42 43 """ Test Job Submission via ManyJob abstraction """ 44 if __name__ == "__main__": 45 try: 46 print "ManyJob load test with " + str(NUMBER_JOBS) + " jobs." 47 starttime=time.time() 48 49 # submit via mj abstraction 50 resource_list = [] 51 resource_list.append( {"resource_url" : "fork://localhost/", "number_of_processes" : "2", "allocation" : "myAllocation", 52 "queue" : "workq", 53 "working_directory": (os.getcwd() + "/agent"), 54 "walltime": 10, "affinity" : "affinity1"}) 55 56 #resource_list.append( {"resource_url" : "gram://oliver1.loni.org/jobmanager-pbs", "number_nodes" : "4", "allocation" : "<your allocation>", 57 # "queue" : "workq", "bigjob_agent": (BIGJOB_HOME+"/bigjob_agent_launcher.sh"), 58 # "working_directory": (os.getcwd() + "/agent"), "walltime":10, "affinity" : "affinity1"}) 59 60 print "Create manyjob service " 61 mjs = many_job_affinity_service(resource_list, COORDINATION_URL) 62 63 jobs = [] 64 job_start_times = {} 65 job_states = {} 66 cwd = os.getcwd() 67 for i in range(0, NUMBER_JOBS): 68 # create job description 69 jd = description() 70 jd.executable = "/bin/date" 71 jd.number_of_processes = "1" 72 jd.spmd_variation = "single" 73 jd.arguments = [""] 74 jd.working_directory = "/Users/luckow" 75 jd.output = os.getcwd() + "/stdout-" + str(i) + ".txt" 76 jd.error = os.getcwd() + "/stderr-" + str(i) + ".txt" 77 jd.environment = ["affinity=affinity1"] 78 subjob = mjs.create_job(jd) 79 subjob.run() 80 print "Submited sub-job " + "%d"%i + "." 81 jobs.append(subjob) 82 job_start_times[subjob]=time.time() 83 job_states[subjob] = subjob.get_state() 84 print "************************ All Jobs submitted ************************" 85 while 1: 86 finish_counter=0 87 result_map = {} 88 for i in range(0, NUMBER_JOBS): 89 old_state = job_states[jobs[i]] 90 state = jobs[i].get_state() 91 if result_map.has_key(state) == False: 92 result_map[state]=0 93 result_map[state] = result_map[state]+1 94 #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state 95 if old_state != state: 96 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state 97 if old_state != state and has_finished(state)==True: 98 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s." 99 if has_finished(state)==True: 100 finish_counter = finish_counter + 1 101 job_states[jobs[i]]=state 102 103 print "Current states: " + str(result_map) 104 time.sleep(5) 105 if finish_counter == NUMBER_JOBS: 106 break 107 108 mjs.cancel() 109 runtime = time.time()-starttime 110 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS) 111 except: 112 traceback.print_exc(file=sys.stdout) 113 try: 114 mjs.cancel() 115 except: 116 pass 117