1
2 """Dynamic BigJob (ManyJob) with affinity aware scheduler.
3 """
4
5 import os
6 import sys
7 sys.path.append(os.path.dirname( __file__ ))
8 from many_job import *
9
11
12 - def __init__(self, bigjob_list, advert_host):
13 """ accepts resource list as key/value pair:
14 ( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "32", "allocation" : "<allocaton>", "queue" : "workq",
15 "re_agent": os.getcwd() + "/bigjob_agent_launcher.sh" "walltime":1000, "affinity": "affinity1"},
16 {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "32", "allocation" : "<allocation", "queue" : "workq",
17 "re_agent": "os.getcwd() + "/bigjob_agent_launcher.sh", "walltime":1000, "affinity": "affinity1"})
18 """
19 super(many_job_affinity_service, self).__init__(bigjob_list, advert_host)
20
22 """ find resource (bigjob) with the same affinity for subjob
23 returns bigjob object """
24 for i in self.bigjob_list:
25 bigjob = i["bigjob"]
26 lock = i["lock"]
27 affinity = i["affinity"]
28 lock.acquire()
29 free_cores = i["free_cores"]
30 bigjob_url = bigjob.pilot_url
31 state = bigjob.get_state_detail()
32 env = subjob.job_description.environment[0]
33 sj_affinity=None
34 if env.find("affinity") >=0:
35 sj_affinity = env.split("=")[1]
36 logging.debug("Subjob Env: " + str(sj_affinity))
37 logging.debug("Big Job: " + bigjob_url + " Cores: " + "%s"%free_cores + "/" + i["number_cores"] + " Affinity: "
38 + affinity + " SJ affinity: " + sj_affinity + " State: " + state)
39 if state.lower() == "running" and free_cores >= int(subjob.job_description.number_of_processes) and affinity == sj_affinity:
40 free_cores = i["free_cores"]
41 free_cores = free_cores - int(subjob.job_description.number_of_processes)
42 i["free_cores"]=free_cores
43 lock.release()
44 return i
45
46 lock.release()
47
48
49 self.subjob_queue.put(subjob)
50 logging.debug("found no active resource for sub-job => (re-) queue it")
51 return None
52