Package bigjob_dynamic :: Module many_job_affinity
[hide private]
[frames] | no frames]

Source Code for Module bigjob_dynamic.many_job_affinity

 1  #!/usr/bin/env python 
 2  """Dynamic BigJob (ManyJob) with affinity aware scheduler. 
 3  """ 
 4   
 5  import os 
 6  import sys 
 7  sys.path.append(os.path.dirname( __file__ )) 
 8  from many_job import * 
 9   
10 -class many_job_affinity_service(many_job_service):
11
12 - def __init__(self, bigjob_list, advert_host):
13 """ accepts resource list as key/value pair: 14 ( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "32", "allocation" : "<allocaton>", "queue" : "workq", 15 "re_agent": os.getcwd() + "/bigjob_agent_launcher.sh" "walltime":1000, "affinity": "affinity1"}, 16 {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "32", "allocation" : "<allocation", "queue" : "workq", 17 "re_agent": "os.getcwd() + "/bigjob_agent_launcher.sh", "walltime":1000, "affinity": "affinity1"}) 18 """ 19 super(many_job_affinity_service, self).__init__(bigjob_list, advert_host)
20
21 - def __schedule_subjob (self, subjob):
22 """ find resource (bigjob) with the same affinity for subjob 23 returns bigjob object """ 24 for i in self.bigjob_list: 25 bigjob = i["bigjob"] 26 lock = i["lock"] 27 affinity = i["affinity"] 28 lock.acquire() 29 free_cores = i["free_cores"] 30 bigjob_url = bigjob.pilot_url 31 state = bigjob.get_state_detail() 32 env = subjob.job_description.environment[0] 33 sj_affinity=None 34 if env.find("affinity") >=0: 35 sj_affinity = env.split("=")[1] 36 logging.debug("Subjob Env: " + str(sj_affinity)) 37 logging.debug("Big Job: " + bigjob_url + " Cores: " + "%s"%free_cores + "/" + i["number_cores"] + " Affinity: " 38 + affinity + " SJ affinity: " + sj_affinity + " State: " + state) 39 if state.lower() == "running" and free_cores >= int(subjob.job_description.number_of_processes) and affinity == sj_affinity: 40 free_cores = i["free_cores"] 41 free_cores = free_cores - int(subjob.job_description.number_of_processes) 42 i["free_cores"]=free_cores 43 lock.release() 44 return i 45 46 lock.release() 47 48 # no resource found 49 self.subjob_queue.put(subjob) 50 logging.debug("found no active resource for sub-job => (re-) queue it") 51 return None
52