1
2
3 """Dynamic BigJob (ManyJob): Manages multiple BigJob (on different resources).
4 Sub-jobs are distributed across the set of BJs managed by the dynamic BJ.
5 """
6 import pdb
7 import sys
8 import os
9 sys.path.append(os.path.dirname( __file__ ))
10 import getopt
11 import saga
12 import time
13 import uuid
14 import socket
15 import traceback
16
17 import Queue
18 import threading
19 import logging
20 import time
21 import math
22 import operator
23 import copy
24
25
26 import bigjob.bigjob_manager
27
28
29 logging.basicConfig(level=logging.DEBUG)
30
31 COORDINATION_URL="advert://advert.cct.lsu.edu:8080"
32
34
35 - def __init__(self, bigjob_list, coordination_url):
36 """ accepts resource list as key/value pair:
37 ( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "32", "allocation" : "loni_stopgap2", "queue" : "workq", "bigjob_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh", "walltime":1000},
38 {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "32", "allocation" : "loni_stopgap2", "queue" : "workq", "bigjob_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh", "walltime":1000})
39 """
40 self.uuid = uuid.uuid1()
41
42 if coordination_url==None:
43 self.advert_host=COORDINATION_URL
44 else:
45 self.advert_host=coordination_url
46
47
48
49 self.bigjob_list=copy.deepcopy(bigjob_list)
50
51
52 self.active_subjob_list = []
53 self.subjob_bigjob_dict = {}
54
55
56 self.subjob_queue = Queue.Queue()
57
58
59 self.__init_bigjobs()
60
61
62 self.stop=threading.Event()
63 self.rescheduler_thread=threading.Thread(target=self.__reschedule_subjobs_thread)
64 self.rescheduler_thread.start()
65
66
67 self.last_queue_size = 0
68 self.submisssion_times=[]
69
75
76
78 """ private method - starts a bigjob on the defined resource """
79 gram_url = bj_dict["resource_url"]
80 logging.debug("start bigjob at: " + gram_url)
81 bj = bigjob.bigjob_manager.bigjob(self.advert_host)
82 ppn="1"
83 if ("processes_per_node" in bj_dict):
84 ppn=bj_dict["processes_per_node"]
85 else:
86 bj_dict["processes_per_node"]="1"
87
88 walltime = 3600
89 if ("walltime" in bj_dict):
90 walltime=bj_dict["walltime"]
91
92 working_directory = (os.getcwd()+"/agent")
93 if ("working_directory" in bj_dict):
94 working_directory=bj_dict["working_directory"]
95
96 bj.start_pilot_job(gram_url,
97 None,
98 bj_dict["number_of_processes"],
99 bj_dict["queue"],
100 bj_dict["allocation"],
101 working_directory,
102 None,
103 walltime,
104 ppn)
105 bj_dict["bigjob"]=bj
106 bj_dict["free_cores"]=int(bj_dict["number_of_processes"])
107 bj_dict["to_be_terminated"]=False
108
109 bj_dict["lock"] = threading.Lock()
110
112 """ adds bigjob described in resource_dictionary to resources """
113 bj_dict = copy.deepcopy(resource_dictionary)
114
115 self.__start_bigjob(bj_dict)
116 self.bigjob_list.append(bj_dict)
117 return bj_dict["bigjob"]
118
119
121 """ remove bigjob from resource list of manyjob """
122
123
124 bigjob["to_be_terminated"]=True
125
126
128 """ called periodically from scheduling thread
129 terminates big-jobs which are marked and don't have
130 any running sub-jobs
131 """
132
133 for i in self.bigjob_list[:]:
134 if i["to_be_terminated"]==True:
135 bj = i["bigjob"]
136 total_cores = int(i["number_of_processes"])
137 if i["free_cores"]==total_cores and not i.has_key("bj_stopped"):
138 logging.debug("***Stop BigJob: " + str(bj.pilot_url))
139
140 bj.stop_pilot_job()
141 i["bj_stopped"]=True
142
143
144
146 """ returns list with bigjob dictionaries
147 for each managed bigjob 1 dictionary exists
148 """
149 return self.bigjob_list
150
151
153 """ returns a list of bigjob objects """
154 return [i["bigjob"] for i in self.bigjob_list]
155
157 """ prioritizes bigjob_list (bigjob with shortest expected delay will have index 0) """
158
159 return self.bigjob_list
160
164
166
167 st = time.time()
168 bigjob_info = self.__schedule_subjob(subjob)
169 job = subjob.job
170 if bigjob_info == None:
171 return job
172
173
174 bj = bigjob_info["bigjob"]
175
176 job.submit_job(bj.pilot_url, subjob.job_description)
177 self.submisssion_times.append(time.time()-st)
178
179
180 self.active_subjob_list.append(subjob)
181 self.subjob_bigjob_dict[subjob] = bigjob_info
182 logging.debug("Subjob submission time: " + str(time.time()-st) + " sec.")
183 return job
184
189
191 """ find resource (bigjob) for subjob
192 returns bigjob object """
193 for i in self.bigjob_list:
194 bigjob = i["bigjob"]
195 lock = i["lock"]
196 lock.acquire()
197 free_cores = i["free_cores"]
198 bigjob_url = bigjob.pilot_url
199 state = bigjob.get_state_detail()
200 logging.debug("Big Job: " + bigjob_url + " Cores: " + "%s"%free_cores + "/"
201 + str(int(i["number_of_processes"]))
202 + " State: " + str(state) + " Terminated: " + str(i["to_be_terminated"])
203 + " #Required Cores: " + subjob.job_description.number_of_processes
204 )
205 if (state.lower() == "running" and free_cores >= int(subjob.job_description.number_of_processes)
206 and i["to_be_terminated"]==False):
207 logging.debug("FOUND match - dispatch to BigJob: " + bigjob_url)
208 free_cores = i["free_cores"]
209 free_cores = free_cores - int(subjob.job_description.number_of_processes)
210 i["free_cores"]=free_cores
211 lock.release()
212 return i
213
214 lock.release()
215
216
217 self.subjob_queue.put(subjob)
218 logging.debug("found no active resource for sub-job => (re-) queue it")
219 return None
220
222 """iterate through all sub-jobs and check state"""
223 for i in self.active_subjob_list:
224 try:
225
226 state = i.job.get_state()
227
228 if self.__has_finished(state) == True:
229
230 self.__free_resources(i)
231 except:
232 exc_type, exc_value, exc_traceback = sys.exc_info()
233 traceback.print_exception(exc_type, exc_value, exc_traceback,
234 limit=2, file=sys.stderr)
235
237 """free resources taken by subjob"""
238 if(self.subjob_bigjob_dict.has_key(subjob)):
239 logging.debug("job: " + str(subjob) + " done - free resources")
240 bigjob = self.subjob_bigjob_dict[subjob]
241 lock = bigjob["lock"]
242 lock.acquire()
243 free_cores = bigjob["free_cores"]
244 free_cores = free_cores + int(subjob.job_description.number_of_processes)
245 bigjob["free_cores"]=free_cores
246 del(self.subjob_bigjob_dict[subjob])
247 lock.release()
248 print "Freed resource - new state: Big Job: " + bigjob["bigjob"].pilot_url + " Cores: " + "%s"%free_cores + "/" + str(int(bigjob["number_of_processes"]))
249
251 """ periodically checks subjob_queue for unscheduled subjobs
252 if a unscheduled job exists it is scheduled
253 """
254
255 while True and self.stop.isSet()==False:
256 logging.debug("Reschedule Thread")
257
258 self.__check_subjobs_states()
259
260 self.__cleanup_resources()
261 subjob = self.subjob_queue.get()
262
263 if isinstance(subjob, sub_job):
264 self.__run_subjob(subjob)
265 if self.last_queue_size == self.subjob_queue.qsize() or self.__get_total_free_cores()==0:
266 time.sleep(2)
267
268 logging.debug("Re-Scheduler terminated")
269
270
272 """ return number of free cores if bigjob is active """
273
274 if (bigjob["bigjob"].get_state_detail().lower()=="running"
275 and bigjob["to_be_terminated"]==False):
276 return bigjob["free_cores"]
277
278 return 0
279
281 """ get's the total number of free cores from all active bigjobs """
282 free_cores = map(self.__get_free_cores, self.bigjob_list)
283
284 if len(free_cores)>0:
285 total_free_cores = reduce(lambda x, y: x + y, free_cores)
286 logging.debug("free_cores: " + str(free_cores) + " total_free_cores: " + str(total_free_cores))
287 return total_free_cores
288 return 0
289
291 logging.debug("Cancel re-scheduler thread")
292 self.stop.set()
293
294 self.subjob_queue.put("dummy")
295 self.rescheduler_thread.join()
296 logging.debug("Cancel many-job: kill all bigjobs")
297 for i in self.bigjob_list:
298 bigjob = i["bigjob"]
299 bigjob.cancel()
300 self.print_stats(self.submisssion_times, "Submission Times")
301
303 try:
304 n = len(times)
305 sum = reduce(operator.add, times)
306 mean = sum/n
307 variance=0
308 if n > 1:
309 for i in times:
310 variance += (i - mean)**2
311 variance /= (n-1)
312 variance = math.sqrt(variance)
313 print description + " Average: " + str(mean) + " Stdev: " + str(variance)
314 except:
315 pass
316
318 state = state.lower()
319 if state=="done" or state=="failed" or state=="canceled":
320 return True
321 else:
322 return False
323
325 return str(self.uuid)
326
329
330
331
332
334 """ Class for controlling individual sub-jobs """
335
336 - def __init__(self, manyjob, job_description, advert_host):
337
338 self.manyjob=manyjob
339
340
341 self.job_description = job_description
342 self.job = None
343
347
349 try:
350 state = self.job.get_state()
351 return state
352 except:
353
354 pass
355 return "Unknown"
356
358 try:
359 arguments = self.job.get_arguments()
360 return arguments
361 except:
362 traceback.print_exc(file=sys.stdout)
363 pass
364 return ""
365
367 try:
368 exe = self.job.get_exe()
369 return exe
370 except:
371 traceback.print_exc(file=sys.stdout)
372 pass
373 return "Unknown"
374
375
378
380 while 1:
381 try:
382 state = self.get_state()
383 logging.debug("wait: state: " + state)
384 if self.__has_finished(state) == True:
385 break
386 time.sleep(2)
387 except (KeyboardInterrupt, SystemExit):
388 raise
389 except:
390 pass
391
394
396 return str(self.job)
397
398 """ Test Job Submission via ManyJob abstraction """
399 if __name__ == "__main__":
400 try:
401 print "Test ManyJob"
402
403 jd = saga.job.description()
404 jd.executable = "/bin/date"
405 jd.number_of_processes = "1"
406 jd.spmd_variation = "single"
407 jd.arguments = [""]
408 jd.working_directory = "/home/luckow"
409 jd.output = "output.txt"
410 jd.error = "error.txt"
411
412
413
414 resource_list = []
415 resource_list.append({"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "16", "allocation" : "<your allocation>", "queue" : "workq", "bigjob_agent": os.getcwd() + "/bigjob_agent_launcher.sh"})
416 print "Create manyjob service "
417 mjs = many_job_service(resource_list, None)
418 print "Create sub-job using manyjob " + str(mjs)
419 subjob = mjs.create_job(jd)
420 print "Run sub-job"
421 subjob.run()
422 print "Wait for termination"
423 subjob.wait()
424 print "Sub-job state: " + str(subjob.get_state())
425 mjs.cancel()
426 except:
427 try:
428 if mjs != None:
429 mjs.cancel()
430 except:
431 pass
432