Package bigjob_dynamic :: Module many_job
[hide private]
[frames] | no frames]

Source Code for Module bigjob_dynamic.many_job

  1  #!/usr/bin/env python 
  2   
  3  """Dynamic BigJob (ManyJob): Manages multiple BigJob (on different resources).  
  4  Sub-jobs are distributed across the set of BJs managed by the dynamic BJ. 
  5  """ 
  6  import pdb 
  7  import sys 
  8  import os 
  9  sys.path.append(os.path.dirname( __file__ )) 
 10  import getopt 
 11  import saga 
 12  import time 
 13  import uuid 
 14  import socket 
 15  import traceback 
 16   
 17  import Queue 
 18  import threading 
 19  import logging 
 20  import time 
 21  import math 
 22  import operator 
 23  import copy 
 24   
 25   
 26  import bigjob.bigjob_manager  
 27   
 28  # Log everything, and send it to stderr. 
 29  logging.basicConfig(level=logging.DEBUG) 
 30   
 31  COORDINATION_URL="advert://advert.cct.lsu.edu:8080" 
 32   
33 -class many_job_service(object):
34
35 - def __init__(self, bigjob_list, coordination_url):
36 """ accepts resource list as key/value pair: 37 ( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "32", "allocation" : "loni_stopgap2", "queue" : "workq", "bigjob_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh", "walltime":1000}, 38 {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "32", "allocation" : "loni_stopgap2", "queue" : "workq", "bigjob_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh", "walltime":1000}) 39 """ 40 self.uuid = uuid.uuid1() 41 42 if coordination_url==None: 43 self.advert_host=COORDINATION_URL 44 else: 45 self.advert_host=coordination_url 46 47 # list of resource dicts (1 dict per resource) 48 # will also store state of bigjob 49 self.bigjob_list=copy.deepcopy(bigjob_list) 50 51 # state variable storing state of sub-jobs 52 self.active_subjob_list = [] 53 self.subjob_bigjob_dict = {} 54 55 # queue contains unscheduled subjobs 56 self.subjob_queue = Queue.Queue() 57 58 # submit bigjobs to resources 59 self.__init_bigjobs() 60 61 # thread which tries to resubmit jobs 62 self.stop=threading.Event() 63 self.rescheduler_thread=threading.Thread(target=self.__reschedule_subjobs_thread) 64 self.rescheduler_thread.start() 65 66 # last queue Size 67 self.last_queue_size = 0 68 self.submisssion_times=[]
69
70 - def __init_bigjobs(self):
71 """ start on specified resources a bigjob """ 72 self.bigjob_list = self.__schedule_bigjobs() 73 for i in self.bigjob_list: 74 self.__start_bigjob(i)
75 76
77 - def __start_bigjob(self, bj_dict):
78 """ private method - starts a bigjob on the defined resource """ 79 gram_url = bj_dict["resource_url"] 80 logging.debug("start bigjob at: " + gram_url) 81 bj = bigjob.bigjob_manager.bigjob(self.advert_host) 82 ppn="1" 83 if ("processes_per_node" in bj_dict): 84 ppn=bj_dict["processes_per_node"] 85 else: 86 bj_dict["processes_per_node"]="1" 87 88 walltime = 3600 89 if ("walltime" in bj_dict): 90 walltime=bj_dict["walltime"] 91 92 working_directory = (os.getcwd()+"/agent") 93 if ("working_directory" in bj_dict): 94 working_directory=bj_dict["working_directory"] 95 96 bj.start_pilot_job(gram_url, 97 None, 98 bj_dict["number_of_processes"], 99 bj_dict["queue"], 100 bj_dict["allocation"], 101 working_directory, 102 None, 103 walltime, 104 ppn) 105 bj_dict["bigjob"]=bj # store bigjob for later reference in dict 106 bj_dict["free_cores"]=int(bj_dict["number_of_processes"]) 107 bj_dict["to_be_terminated"]=False 108 # lock for modifying the number of free nodes 109 bj_dict["lock"] = threading.Lock()
110
111 - def add_resource(self, resource_dictionary):
112 """ adds bigjob described in resource_dictionary to resources """ 113 bj_dict = copy.deepcopy(resource_dictionary) 114 115 self.__start_bigjob(bj_dict) 116 self.bigjob_list.append(bj_dict) 117 return bj_dict["bigjob"]
118 119
120 - def remove_resource(self, bigjob):
121 """ remove bigjob from resource list of manyjob """ 122 # mark bigjob for termination (after all sub-jobs in bj are 123 # finished 124 bigjob["to_be_terminated"]=True
125 126
127 - def __cleanup_resources(self):
128 """ called periodically from scheduling thread 129 terminates big-jobs which are marked and don't have 130 any running sub-jobs 131 """ 132 # iterate over copy of list, but remove from orig list 133 for i in self.bigjob_list[:]: 134 if i["to_be_terminated"]==True: 135 bj = i["bigjob"] 136 total_cores = int(i["number_of_processes"]) 137 if i["free_cores"]==total_cores and not i.has_key("bj_stopped"): 138 logging.debug("***Stop BigJob: " + str(bj.pilot_url)) 139 # release resources of pilot job 140 bj.stop_pilot_job() 141 i["bj_stopped"]=True
142 #self.bigjob_list.remove(i) 143 144
145 - def get_resources(self):
146 """ returns list with bigjob dictionaries 147 for each managed bigjob 1 dictionary exists 148 """ 149 return self.bigjob_list
150 151
152 - def list_bigjobs(self):
153 """ returns a list of bigjob objects """ 154 return [i["bigjob"] for i in self.bigjob_list]
155
156 - def __schedule_bigjobs(self):
157 """ prioritizes bigjob_list (bigjob with shortest expected delay will have index 0) """ 158 # no scheduling for now (start bigjob in the user specified order) 159 return self.bigjob_list
160
161 - def create_job (self, job_description):
162 subjob = sub_job(self, job_description, self.advert_host) 163 return subjob
164
165 - def __run_subjob(self, subjob):
166 # select appropriate bigjob 167 st = time.time() 168 bigjob_info = self.__schedule_subjob(subjob) 169 job = subjob.job 170 if bigjob_info == None: 171 return job 172 173 # create subjob on bigjob 174 bj = bigjob_info["bigjob"] 175 176 job.submit_job(bj.pilot_url, subjob.job_description) 177 self.submisssion_times.append(time.time()-st) 178 179 # store reference of subjob for further bookkeeping 180 self.active_subjob_list.append(subjob) 181 self.subjob_bigjob_dict[subjob] = bigjob_info 182 logging.debug("Subjob submission time: " + str(time.time()-st) + " sec.") 183 return job
184
185 - def queue_subjob(self, subjob):
186 subjob.job = bigjob.bigjob_manager.subjob(self.advert_host) 187 self.subjob_queue.put(subjob) 188 return subjob.job
189
190 - def __schedule_subjob (self, subjob):
191 """ find resource (bigjob) for subjob 192 returns bigjob object """ 193 for i in self.bigjob_list: 194 bigjob = i["bigjob"] 195 lock = i["lock"] 196 lock.acquire() 197 free_cores = i["free_cores"] 198 bigjob_url = bigjob.pilot_url 199 state = bigjob.get_state_detail() 200 logging.debug("Big Job: " + bigjob_url + " Cores: " + "%s"%free_cores + "/" 201 + str(int(i["number_of_processes"])) 202 + " State: " + str(state) + " Terminated: " + str(i["to_be_terminated"]) 203 + " #Required Cores: " + subjob.job_description.number_of_processes 204 ) 205 if (state.lower() == "running" and free_cores >= int(subjob.job_description.number_of_processes) 206 and i["to_be_terminated"]==False): 207 logging.debug("FOUND match - dispatch to BigJob: " + bigjob_url) 208 free_cores = i["free_cores"] 209 free_cores = free_cores - int(subjob.job_description.number_of_processes) 210 i["free_cores"]=free_cores 211 lock.release() 212 return i 213 214 lock.release() 215 216 # no resource found 217 self.subjob_queue.put(subjob) 218 logging.debug("found no active resource for sub-job => (re-) queue it") 219 return None
220
221 - def __check_subjobs_states(self):
222 """iterate through all sub-jobs and check state""" 223 for i in self.active_subjob_list: 224 try: 225 #logging.debug("get job state") 226 state = i.job.get_state() 227 #logging.debug("check job state") 228 if self.__has_finished(state) == True: 229 #logging.debug("free resources") 230 self.__free_resources(i) 231 except: 232 exc_type, exc_value, exc_traceback = sys.exc_info() 233 traceback.print_exception(exc_type, exc_value, exc_traceback, 234 limit=2, file=sys.stderr)
235
236 - def __free_resources(self, subjob):
237 """free resources taken by subjob""" 238 if(self.subjob_bigjob_dict.has_key(subjob)): 239 logging.debug("job: " + str(subjob) + " done - free resources") 240 bigjob = self.subjob_bigjob_dict[subjob] 241 lock = bigjob["lock"] 242 lock.acquire() 243 free_cores = bigjob["free_cores"] 244 free_cores = free_cores + int(subjob.job_description.number_of_processes) 245 bigjob["free_cores"]=free_cores 246 del(self.subjob_bigjob_dict[subjob]) 247 lock.release() 248 print "Freed resource - new state: Big Job: " + bigjob["bigjob"].pilot_url + " Cores: " + "%s"%free_cores + "/" + str(int(bigjob["number_of_processes"]))
249
250 - def __reschedule_subjobs_thread(self):
251 """ periodically checks subjob_queue for unscheduled subjobs 252 if a unscheduled job exists it is scheduled 253 """ 254 255 while True and self.stop.isSet()==False: 256 logging.debug("Reschedule Thread") 257 # check sub-job state 258 self.__check_subjobs_states() 259 # remove unneeded big-jobs 260 self.__cleanup_resources() 261 subjob = self.subjob_queue.get() 262 # check whether this is a real subjob object 263 if isinstance(subjob, sub_job): 264 self.__run_subjob(subjob) 265 if self.last_queue_size == self.subjob_queue.qsize() or self.__get_total_free_cores()==0: 266 time.sleep(2) # sleep 30 s 267 268 logging.debug("Re-Scheduler terminated")
269 270
271 - def __get_free_cores(self, bigjob):
272 """ return number of free cores if bigjob is active """ 273 #pdb.set_trace() 274 if (bigjob["bigjob"].get_state_detail().lower()=="running" 275 and bigjob["to_be_terminated"]==False): 276 return bigjob["free_cores"] 277 278 return 0
279
280 - def __get_total_free_cores(self):
281 """ get's the total number of free cores from all active bigjobs """ 282 free_cores = map(self.__get_free_cores, self.bigjob_list) 283 #print "Free cores: " + str(free_cores) 284 if len(free_cores)>0: 285 total_free_cores = reduce(lambda x, y: x + y, free_cores) 286 logging.debug("free_cores: " + str(free_cores) + " total_free_cores: " + str(total_free_cores)) 287 return total_free_cores 288 return 0
289
290 - def cancel(self):
291 logging.debug("Cancel re-scheduler thread") 292 self.stop.set() 293 # put object in queue to unlock the get() operation 294 self.subjob_queue.put("dummy") 295 self.rescheduler_thread.join() 296 logging.debug("Cancel many-job: kill all bigjobs") 297 for i in self.bigjob_list: 298 bigjob = i["bigjob"] 299 bigjob.cancel() 300 self.print_stats(self.submisssion_times, "Submission Times")
301
302 - def print_stats(self, times, description):
303 try: 304 n = len(times) 305 sum = reduce(operator.add, times) 306 mean = sum/n 307 variance=0 308 if n > 1: 309 for i in times: 310 variance += (i - mean)**2 311 variance /= (n-1) 312 variance = math.sqrt(variance) 313 print description + " Average: " + str(mean) + " Stdev: " + str(variance) 314 except: 315 pass
316
317 - def __has_finished(self, state):
318 state = state.lower() 319 if state=="done" or state=="failed" or state=="canceled": 320 return True 321 else: 322 return False
323
324 - def __repr__(self):
325 return str(self.uuid)
326
327 - def __del__(self):
328 self.cancel()
329 330 331 332
333 -class sub_job(object):
334 """ Class for controlling individual sub-jobs """ 335
336 - def __init__(self, manyjob, job_description, advert_host):
337 # store bigjob for reference 338 self.manyjob=manyjob 339 340 # init sub-job via advert 341 self.job_description = job_description 342 self.job = None
343
344 - def run(self):
345 # select appropriate bigjob 346 self.job = self.manyjob.queue_subjob(self)
347
348 - def get_state(self):
349 try: 350 state = self.job.get_state() 351 return state 352 except: 353 #traceback.print_stack() 354 pass 355 return "Unknown"
356
357 - def get_arguments(self):
358 try: 359 arguments = self.job.get_arguments() 360 return arguments 361 except: 362 traceback.print_exc(file=sys.stdout) 363 pass 364 return ""
365
366 - def get_exe(self):
367 try: 368 exe = self.job.get_exe() 369 return exe 370 except: 371 traceback.print_exc(file=sys.stdout) 372 pass 373 return "Unknown"
374 375
376 - def cancel(self):
377 return self.job.cancel()
378
379 - def wait(self):
380 while 1: 381 try: 382 state = self.get_state() 383 logging.debug("wait: state: " + state) 384 if self.__has_finished(state) == True: 385 break 386 time.sleep(2) 387 except (KeyboardInterrupt, SystemExit): 388 raise 389 except: 390 pass
391
392 - def __del__(self):
393 pass
394
395 - def __repr__(self):
396 return str(self.job) 397 398 """ Test Job Submission via ManyJob abstraction """ 399 if __name__ == "__main__": 400 try: 401 print "Test ManyJob" 402 # create job description 403 jd = saga.job.description() 404 jd.executable = "/bin/date" 405 jd.number_of_processes = "1" 406 jd.spmd_variation = "single" 407 jd.arguments = [""] 408 jd.working_directory = "/home/luckow" 409 jd.output = "output.txt" 410 jd.error = "error.txt" 411 # submit via mj abstraction 412 #resource_list = ( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "128", "allocation" : "<your allocation>", "queue" : "workq", "bigjob_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh"}, 413 # {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "64", "allocation" : "<your allocation>", "queue" : "workq", "bigjob_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh"}) 414 resource_list = [] 415 resource_list.append({"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "16", "allocation" : "<your allocation>", "queue" : "workq", "bigjob_agent": os.getcwd() + "/bigjob_agent_launcher.sh"}) 416 print "Create manyjob service " 417 mjs = many_job_service(resource_list, None) 418 print "Create sub-job using manyjob " + str(mjs) 419 subjob = mjs.create_job(jd) 420 print "Run sub-job" 421 subjob.run() 422 print "Wait for termination" 423 subjob.wait() 424 print "Sub-job state: " + str(subjob.get_state()) 425 mjs.cancel() 426 except: 427 try: 428 if mjs != None: 429 mjs.cancel() 430 except: 431 pass 432