Package bigjob :: Module bigjob_agent
[hide private]
[frames] | no frames]

Source Code for Module bigjob.bigjob_agent

  1  #!/usr/bin/env python 
  2   
  3  """bigjob_agent: bigjob agent that is executed on the resource 
  4  """ 
  5   
  6   
  7  import sys 
  8  import os 
  9  import bigjob.state 
 10  import socket 
 11  import threading 
 12  import time 
 13  import pdb 
 14  import traceback 
 15  import ConfigParser 
 16  import types 
 17  import logging 
 18  logging.basicConfig(level=logging.DEBUG) 
 19   
 20  try: 
 21      import saga 
 22  except: 
 23      logging.warning("SAGA could not be found. Not all functionalities working") 
 24   
 25  sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../ext/threadpool-1.2.7/src/") 
 26  logging.debug(str(sys.path)) 
 27  from threadpool import * 
 28  from bigjob import logger 
 29   
 30  if sys.version_info < (2, 5): 
 31      sys.path.append(os.path.dirname( __file__ ) + "/../../ext/uuid-1.30/") 
 32      sys.stderr.write("Warning: Using unsupported Python version\n") 
 33  if sys.version_info < (2, 4): 
 34      sys.path.append(os.path.dirname( __file__ ) + "/../../ext/subprocess-2.6.4/") 
 35      sys.stderr.write("Warning: Using unsupported Python version\n") 
 36  if sys.version_info < (2, 3): 
 37      sys.stderr.write("Warning: Python versions <2.3 not supported\n") 
 38      sys.exit(-1) 
 39   
 40  import subprocess 
 41   
 42  """ Config parameters (will move to config file in future) """ 
 43  CONFIG_FILE="bigjob_agent.conf" 
 44  THREAD_POOL_SIZE=4 
 45  APPLICATION_NAME="bigjob" 
 46   
47 -class bigjob_agent:
48 49 """BigJob Agent: 50 - reads new job information from communication and coordination subsystem (Redis) 51 - starts new jobs 52 - monitors running jobs """ 53 54 """Constructor"""
55 - def __init__(self, args):
56 57 self.coordination_url = args[1] 58 # objects to store running jobs and processes 59 self.jobs = [] 60 self.processes = {} 61 self.freenodes = [] 62 self.busynodes = [] 63 self.restarted = {} 64 65 # read config file 66 # conf_file = os.path.dirname(args[0]) + "/" + CONFIG_FILE 67 # conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/" + CONFIG_FILE 68 conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + CONFIG_FILE 69 config = ConfigParser.ConfigParser() 70 logging.debug ("read configfile: " + conf_file) 71 config.read(conf_file) 72 default_dict = config.defaults() 73 self.CPR = default_dict["cpr"] 74 self.SHELL=default_dict["shell"] 75 self.MPIRUN=default_dict["mpirun"] 76 logging.debug("cpr: " + self.CPR + " mpi: " + self.MPIRUN + " shell: " + self.SHELL) 77 78 # init rms (SGE/PBS) 79 self.init_rms() 80 81 self.failed_polls = 0 82 83 ############################################################################## 84 # initialization of coordination and communication subsystem 85 # Redis initialization 86 self.base_url = args[2] 87 self.id = self.__get_bj_id(self.base_url) 88 logger.debug("BigJob Agent arguments: " + str(args)) 89 logger.debug("Initialize C&C subsystem to pilot-url: " + self.base_url) 90 logger.debug("BigJob ID: %s"%self.id) 91 92 # create bj directory 93 self.bj_dir = os.path.join(os.getcwd(), self.id) 94 try: 95 os.makedirs(self.bj_dir) 96 except: 97 logger.debug("Directory already exists.") 98 99 os.chdir(self.bj_dir) 100 101 if(self.coordination_url.startswith("advert://") or self.coordination_url.startswith("sqlasyncadvert://")): 102 try: 103 from coordination.bigjob_coordination_advert import bigjob_coordination 104 logging.debug("Utilizing ADVERT Backend: " + self.coordination_url) 105 except: 106 logger.error("Advert Backend could not be loaded") 107 exc_type, exc_value, exc_traceback = sys.exc_info() 108 traceback.print_exc(file=sys.stderr) 109 traceback.print_tb(exc_traceback, file=sys.stderr) 110 elif (self.coordination_url.startswith("redis://")): 111 try: 112 from coordination.bigjob_coordination_redis import bigjob_coordination 113 logger.debug("Utilizing Redis Backend: " + self.coordination_url + ". Please make sure Redis server is configured in bigjob_coordination_redis.py") 114 except: 115 logger.error("Error loading pyredis.") 116 elif (self.coordination_url.startswith("tcp://")): 117 try: 118 from coordination.bigjob_coordination_zmq import bigjob_coordination 119 logger.debug("Utilizing ZMQ Backend") 120 except: 121 logger.error("ZMQ Backend not found. Please install ZeroMQ (http://www.zeromq.org/intro:get-the-software) and " 122 +"PYZMQ (http://zeromq.github.com/pyzmq/)") 123 124 self.coordination = bigjob_coordination(server_connect_url=self.coordination_url) 125 126 # update state of pilot job to running 127 self.coordination.set_pilot_state(self.base_url, str(bigjob.state.Running), False) 128 129 130 ############################################################################## 131 # start background thread for polling new jobs and monitoring current jobs 132 self.resource_lock=threading.RLock() 133 self.threadpool = ThreadPool(THREAD_POOL_SIZE) 134 135 self.launcher_thread=threading.Thread(target=self.dequeue_new_jobs) 136 self.launcher_thread.start() 137 138 self.monitoring_thread=threading.Thread(target=self.start_background_thread) 139 self.monitoring_thread.start()
140 141
142 - def __get_bj_id(self, url):
143 logger.debug("parsing ID out of URL: %s"%url) 144 start = url.index("bj-") 145 end = url.index(":", start) 146 bj_id = url[start:end] 147 return bj_id
148 149
150 - def init_rms(self):
151 if(os.environ.get("PBS_NODEFILE")!=None): 152 return self.init_pbs() 153 elif(os.environ.get("PE_HOSTFILE")!=None): 154 return self.init_sge() 155 else: 156 return self.init_local() 157 return None
158
159 - def init_local(self):
160 """ initialize free nodes list with dummy (for fork jobs)""" 161 try: 162 num_cpus = self.get_num_cpus() 163 for i in range(0, num_cpus): 164 self.freenodes.append("localhost\n") 165 except IOError: 166 self.freenodes=["localhost\n"]
167
168 - def init_sge(self):
169 """ initialize free nodes list from SGE environment """ 170 sge_node_file = os.environ.get("PE_HOSTFILE") 171 if sge_node_file == None: 172 return 173 f = open(sge_node_file) 174 sgenodes = f.readlines() 175 f.close() 176 for i in sgenodes: 177 178 columns = i.split() 179 try: 180 for j in range(0, int(columns[1])): 181 logger.debug("add host: " + columns[0].strip()) 182 self.freenodes.append(columns[0]+"\n") 183 except: 184 pass 185 return self.freenodes
186
187 - def init_pbs(self):
188 """ initialize free nodes list from PBS environment """ 189 pbs_node_file = os.environ.get("PBS_NODEFILE") 190 if pbs_node_file == None: 191 return 192 f = open(pbs_node_file) 193 self.freenodes = f.readlines() 194 f.close() 195 196 # check whether pbs node file contains the correct number of nodes 197 num_cpus = self.get_num_cpus() 198 node_dict={} 199 for i in set(self.freenodes): 200 node_dict[i] = self.freenodes.count(i) 201 if node_dict[i] < num_cpus: 202 node_dict[i] = num_cpus 203 204 self.freenodes=[] 205 for i in node_dict.keys(): 206 logger.debug("host: " + i + " nodes: " + str(node_dict[i])) 207 for j in range(0, node_dict[i]): 208 logger.debug("add host: " + i.strip()) 209 self.freenodes.append(i)
210
211 - def get_num_cpus(self):
212 cpuinfo = open("/proc/cpuinfo", "r") 213 cpus = cpuinfo.readlines() 214 cpuinfo.close() 215 num = 0 216 for i in cpus: 217 if i.startswith("processor"): 218 num = num+1 219 return num
220 221
222 - def execute_job(self, job_url, job_dict):
223 """ obtain job attributes from c&c and execute process """ 224 state=str(job_dict["state"]) 225 226 #try: 227 # state = self.coordination.get_job_state(job_url) 228 #except: 229 # logger.error("Could not access job state... skip execution attempt and requeuing job") 230 # result = self.coordination.queue_job(self.base_url, job_url) 231 # if result == False: 232 # self.coordination.set_job_state(job_url, str(saga.job.Failed)) 233 234 if(state==str(bigjob.state.Unknown) or 235 state==str(bigjob.state.New)): 236 try: 237 #job_dict["state"]=str(saga.job.New) 238 job_id = job_dict["job-id"] 239 logger.debug("Start job id %s specification %s: "%(job_id, str(job_dict))) 240 numberofprocesses = "1" 241 try: 242 if (job_dict.has_key("NumberOfProcesses") == True): 243 numberofprocesses = job_dict["NumberOfProcesses"] 244 except: 245 pass # ignore in particular if Bliss is used 246 247 spmdvariation="single" 248 try: 249 if (job_dict.has_key("SPMDVariation") == True): 250 spmdvariation = job_dict["SPMDVariation"] 251 except: 252 pass # ignore in particular if Bliss is used 253 254 arguments = "" 255 if (job_dict.has_key("Arguments") == True): 256 arguments_raw = job_dict['Arguments']; 257 if type(arguments_raw) == types.ListType: 258 arguments_list = arguments_raw 259 else: 260 arguments_list = eval(job_dict["Arguments"]) 261 for i in arguments_list: 262 arguments = arguments + " " + i 263 264 environment = os.environ 265 envi = "" 266 if (job_dict.has_key("Environment") == True): 267 env_raw = job_dict['Environment'] 268 if type(env_raw) == types.ListType: 269 env_list = env_raw 270 else: 271 env_list = eval(job_dict["Environment"]) 272 for i in env_list: 273 envi_1 = "export " + i +"; " 274 envi = envi + envi_1 275 276 executable = job_dict["Executable"] 277 278 workingdirectory = os.path.join(os.getcwd(), job_id) 279 if (job_dict.has_key("WorkingDirectory") == True): 280 workingdirectory = job_dict["WorkingDirectory"] 281 try: 282 os.makedirs(workingdirectory) 283 except: 284 logger.debug("Directory %s already exists."%workingdirectory) 285 logging.debug("Sub-Job: %s, Working_directory: %s"%(job_id, workingdirectory)) 286 287 288 output="stdout" 289 if (job_dict.has_key("Output") == True): 290 output = job_dict["Output"] 291 if not os.path.isabs(output): 292 output=os.path.join(workingdirectory, output) 293 294 error=os.path.join(workingdirectory,"stderr") 295 if (job_dict.has_key("Error") == True): 296 error = job_dict["Error"] 297 if not os.path.isabs(error): 298 error=os.path.join(workingdirectory, error) 299 300 # append job to job list 301 self.jobs.append(job_url) 302 303 # create stdout/stderr file descriptors 304 output_file = os.path.abspath(output) 305 error_file = os.path.abspath(error) 306 logger.debug("stdout: " + output_file + " stderr: " + error_file 307 + " env: " + str(environment)) 308 stdout = open(output_file, "w") 309 stderr = open(error_file, "w") 310 if ( spmdvariation.lower( )!="mpi"): 311 command = envi + executable + " " + arguments 312 else: 313 command = executable + " " + arguments 314 #pdb.set_trace() 315 # special setup for MPI NAMD jobs 316 machinefile = self.allocate_nodes(job_dict) 317 host = "localhost" 318 try: 319 machine_file_handler = open(machinefile, "r") 320 node= machine_file_handler.readlines() 321 machine_file_handler.close() 322 host = node[0].strip() 323 except: 324 pass 325 326 327 if(machinefile==None): 328 logger.debug("Not enough resources to run: " + job_url) 329 self.coordination.queue_job(self.base_url, job_url) 330 return # job cannot be run at the moment 331 332 # start application process 333 if (spmdvariation.lower( )=="mpi"): 334 command = "cd " + workingdirectory + "; " + envi + self.MPIRUN + " -np " + numberofprocesses + " -machinefile " + machinefile + " " + command 335 #if (host != socket.gethostname()): 336 # command ="ssh " + host + " \"cd " + workingdirectory + "; " + command +"\"" 337 else: 338 command ="ssh " + host + " \"cd " + workingdirectory + "; " + command +"\"" 339 shell = self.SHELL 340 logger.debug("execute: " + command + " in " + workingdirectory + " from: " + str(socket.gethostname()) + " (Shell: " + shell +")") 341 # bash works fine for launching on QB but fails for Abe :-( 342 p = subprocess.Popen(args=command, executable=shell, stderr=stderr, 343 stdout=stdout, cwd=workingdirectory, 344 env=environment, shell=True) 345 logger.debug("started " + command) 346 self.processes[job_url] = p 347 self.coordination.set_job_state(job_url, str(bigjob.state.Running)) 348 except: 349 traceback.print_exc(file=sys.stderr)
350 351
352 - def allocate_nodes(self, job_dict):
353 """ allocate nodes 354 allocated nodes will be written to machinefile advert-launcher-machines-<jobid> 355 """ 356 self.resource_lock.acquire() 357 number_nodes = int(job_dict["NumberOfProcesses"]) 358 nodes = [] 359 machine_file_name = None 360 if (len(self.freenodes)>=number_nodes): 361 unique_nodes=set(self.freenodes) 362 for i in unique_nodes: 363 number = self.freenodes.count(i) 364 logger.debug("allocate: " + i + " number nodes: " + str(number) 365 + " current busy nodes: " + str(self.busynodes) 366 + " free nodes: " + str(self.freenodes)) 367 for j in range(0, number): 368 if(number_nodes > 0): 369 nodes.append(i) 370 self.freenodes.remove(i) 371 self.busynodes.append(i) 372 number_nodes = number_nodes - 1 373 else: 374 break 375 376 machine_file_name = self.get_machine_file_name(job_dict) 377 machine_file = open(machine_file_name, "w") 378 #machine_file.writelines(self.freenodes[:number_nodes]) 379 machine_file.writelines(nodes) 380 machine_file.close() 381 logger.debug("wrote machinefile: " + machine_file_name + " Nodes: " + str(nodes)) 382 # update node structures 383 #self.busynodes.extend(self.freenodes[:number_nodes]) 384 #del(self.freenodes[:number_nodes]) 385 386 self.resource_lock.release() 387 return machine_file_name
388 389 390
391 - def setup_charmpp_nodefile(self, allocated_nodes):
392 """ Setup charm++ nodefile to use for executing NAMD 393 HACK!! Method violates layering principle 394 File $HOME/machinefile in charm++ nodefileformat is written to first node in list 395 """ 396 # Nodelist format: 397 # 398 # host tp-x001 ++cpus 2 ++shell ssh 399 # host tp-x002 ++cpus 2 ++shell ssh 400 401 nodefile_string="" 402 for i in allocated_nodes: 403 if i.has_key("private_hostname"): 404 nodefile_string=nodefile_string + "host "+ i["private_hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n" 405 else: 406 nodefile_string=nodefile_string + "host "+ i["hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n" 407 408 # copy nodefile to rank 0 node 409 jd = saga.job.description() 410 jd.executable = "echo" 411 jd.number_of_processes = "1" 412 jd.spmd_variation = "single" 413 # ssh root@tp-x001.ci.uchicago.edu "cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys" 414 jd.arguments = ["\""+nodefile_string+"\"", ">", "machinefile"] 415 jd.output = "stdout.txt" 416 jd.error = "stderr.txt" 417 418 job_service_url = saga.url("ssh://root@"+allocated_nodes[0]["hostname"]) 419 job_service = saga.job.service(self.session, job_service_url) 420 job = job_service.create_job(jd) 421 job.run() 422 job.wait()
423
424 - def print_machine_file(self, filename):
425 fh = open(filename, "r") 426 lines = fh.readlines() 427 fh.close 428 logger.debug("Machinefile: " + filename + " Hosts: " + str(lines))
429
430 - def free_nodes(self, job_url):
431 job_dict = self.coordination.get_job(job_url) 432 self.resource_lock.acquire() 433 number_nodes = int(job_dict["NumberOfProcesses"]) 434 machine_file_name = self.get_machine_file_name(job_dict) 435 logger.debug("Machine file: " + machine_file_name) 436 allocated_nodes = ["localhost\n"] 437 try: 438 machine_file = open(machine_file_name, "r") 439 allocated_nodes = machine_file.readlines() 440 machine_file.close() 441 except: 442 traceback.print_exc(file=sys.stderr) 443 444 logger.debug("Free nodes: " + str(allocated_nodes)) 445 446 for i in allocated_nodes: 447 logger.debug("free node: " + str(i) + " current busy nodes: " + str(self.busynodes) 448 + " free nodes: " + str(self.freenodes)) 449 self.busynodes.remove(i) 450 self.freenodes.append(i) 451 logger.debug("Delete " + machine_file_name) 452 if os.path.exists(machine_file_name): 453 os.remove(machine_file_name) 454 self.resource_lock.release()
455 456
457 - def get_machine_file_name(self, job_dict):
458 """create machinefile based on jobid""" 459 job_id = job_dict["job-id"] 460 homedir = os.path.expanduser('~') 461 return homedir + "/advert-launcher-machines-"+ job_id
462
463 - def dequeue_new_jobs(self):
464 """Subscribe to new jobs from Redis. """ 465 job_counter = 0 466 while self.is_stopped(self.base_url)==False: 467 if len(self.freenodes)==0: 468 time.sleep(3) 469 continue 470 logger.debug("Dequeue sub-job from: " + self.base_url) 471 job_url=self.coordination.dequeue_job(self.base_url) 472 logger.debug("Dequed:%s"%str(job_url)) 473 if job_url==None: 474 time.sleep(3) 475 continue 476 if job_url=="STOP": 477 break 478 479 job_counter = job_counter + 1 480 if (job_counter % (THREAD_POOL_SIZE))==0: # ensure that threadpool is not too overloaded 481 self.threadpool.wait() 482 483 request = WorkRequest(self.start_new_job_in_thread, [job_url]) 484 self.threadpool.putRequest(request) 485 486 # wait for termination of Worker Threads 487 self.threadpool.wait() 488 logger.debug("Terminating Agent - Dequeue Sub-Jobs Thread")
489 490 #def poll_jobs(self): 491 # self.threadpool.wait() 492 # new_jobs=self.redis.keys(self.base_url+":*") 493 # logger.debug("All jobs:" + str(new_jobs)) 494 # for i in new_jobs: 495 # request = WorkRequest(self.start_new_job_in_thread, [str(i)]) 496 # logger.debug("WorkRequest: " + str(request)) 497 # self.threadpool.putRequest(request) 498
499 - def start_new_job_in_thread(self, job_url):
500 """evaluates job dir, sanity checks, executes job """ 501 #pdb.set_trace() 502 if job_url != None: 503 failed = False; 504 try: 505 logger.debug("Get job description") 506 job_dict = self.coordination.get_job(job_url) 507 except: 508 logger.error("Failed to get job description") 509 failed=True 510 511 if job_dict==None or failed==True: 512 self.coordination.queue_job(self.pilot_url, job_url) 513 514 logger.debug("start job: " + job_url + " data: " + str(job_dict)) 515 if(job_dict["state"]==str(bigjob.state.Unknown)): 516 job_dict["state"]=str(bigjob.state.New) 517 self.coordination.set_job_state(job_url, str(bigjob.state.New)) 518 self.execute_job(job_url, job_dict)
519 #print "Execute: " + str(job_dict) 520
521 - def monitor_jobs(self):
522 """Monitor running processes. """ 523 #pdb.set_trace() 524 logger.debug("Monitor jobs - # current jobs: %d"%len(self.jobs)) 525 for i in self.jobs: 526 if self.processes.has_key(i): # only if job has already been starteds 527 p = self.processes[i] 528 p_state = p.poll() 529 logger.debug(self.print_job(i) + " state: " + str(p_state) + " return code: " + str(p.returncode)) 530 if (p_state != None and (p_state==0 or p_state==255)): 531 logger.debug("Job successful: " + self.print_job(i)) 532 self.coordination.set_job_state(i, str(bigjob.state.Done)) 533 #i.set_attribute("state", str(saga.job.Done)) 534 self.free_nodes(i) 535 del self.processes[i] 536 elif p_state!=0 and p_state!=255 and p_state != None: 537 logger.debug(self.print_job(i) + " failed. ") 538 # do not free nodes => very likely the job will fail on these nodes 539 # self.free_nodes(i) 540 #if self.restarted.has_key(i)==False: 541 # logger.debug("Try to restart job " + self.print_job(i)) 542 # self.restarted[i]=True 543 # self.execute_job(i) 544 #else: 545 logger.debug("Job failed " + self.print_job(i)) 546 self.coordination.set_job_state(i, str(bigjob.state.Failed)) 547 self.free_nodes(i) 548 del self.processes[i]
549
550 - def print_job(self, job_url):
551 job_dict = self.coordination.get_job(job_url) 552 return ("Job: " + job_url + " Excutable: " + job_dict["Executable"])
553 554
555 - def start_background_thread(self):
556 self.stop=False 557 logger.debug("##################################### New POLL/MONITOR cycle ##################################") 558 logger.debug("Free nodes: " + str(len(self.freenodes)) + " Busy Nodes: " + str(len(self.busynodes))) 559 while True and self.stop==False: 560 if self.is_stopped(self.base_url)==True: 561 logger.debug("Pilot job entry deleted - terminate agent") 562 break 563 else: 564 logger.debug("Pilot job entry: " + str(self.base_url) + " exists. Pilot job not in state stopped.") 565 try: 566 #self.poll_jobs() 567 self.monitor_jobs() 568 time.sleep(5) 569 self.failed_polls=0 570 except: 571 traceback.print_exc(file=sys.stdout) 572 self.failed_polls=self.failed_polls+1 573 if self.failed_polls>3: # after 3 failed attempts exit 574 break 575 logger.debug("Terminating Agent - Background Thread") 576 577
578 - def is_stopped(self, base_url):
579 state = None 580 try: 581 state = self.coordination.get_pilot_state(base_url) 582 except: 583 pass 584 logger.debug("Pilot State: " + str(state)) 585 if state==None or state.has_key("stopped")==False or state["stopped"]==True: 586 return True 587 else: 588 return False
589 590
591 - def stop_background_thread(self):
592 self.stop=True 593 594 595 ######################################################### 596 # main # 597 ######################################################### 598 if __name__ == "__main__" : 599 args = sys.argv 600 num_args = len(args) 601 if (num_args!=3): 602 print "Usage: \n " + args[0] + " <coordination host url> <coordination namespace url>" 603 sys.exit(1) 604 605 bigjob_agent = bigjob_agent(args) 606