Package bigjob :: Module bigjob_agent_condor
[hide private]
[frames] | no frames]

Source Code for Module bigjob.bigjob_agent_condor

  1  #!/usr/bin/env python 
  2  """bigjob_agent: bigjob agent adapted for Condor resources 
  3  """ 
  4  import sys 
  5  import os 
  6  import bigjob.state 
  7  import socket 
  8  import threading 
  9  import time 
 10  import pdb 
 11  import traceback 
 12  import ConfigParser 
 13  import types 
 14  import logging 
 15  logging.basicConfig(level=logging.DEBUG) 
 16   
 17  try: 
 18      import saga 
 19  except: 
 20      logging.warning("SAGA could not be found. Not all functionalities working") 
 21   
 22  sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../ext/threadpool-1.2.7/src/") 
 23  logging.debug(str(sys.path)) 
 24  from threadpool import * 
 25   
 26  if sys.version_info < (2, 5): 
 27      sys.path.append(os.path.dirname( __file__ ) + "/../../ext/uuid-1.30/") 
 28      sys.stderr.write("Warning: Using unsupported Python version\n") 
 29  if sys.version_info < (2, 4): 
 30      sys.path.append(os.path.dirname( __file__ ) + "/../../ext/subprocess-2.6.4/") 
 31      sys.stderr.write("Warning: Using unsupported Python version\n") 
 32  if sys.version_info < (2, 3): 
 33      sys.stderr.write("Warning: Python versions <2.3 not supported\n") 
 34      sys.exit(-1) 
 35   
 36  import subprocess 
 37   
 38  """ Config parameters (will move to config file in future) """ 
 39  CONFIG_FILE="bigjob_agent.conf" 
 40  THREAD_POOL_SIZE=4 
 41  APPLICATION_NAME="bigjob" 
 42   
43 -class bigjob_agent:
44 45 """BigJob Agent: 46 - reads new job information from communication and coordination subsystem (Redis) 47 - starts new jobs 48 - monitors running jobs """ 49 50 """Constructor"""
51 - def __init__(self, args):
52 53 self.coordination_url = args[1] 54 # objects to store running jobs and processes 55 self.jobs = [] 56 self.processes = {} 57 self.freenodes = [] 58 self.busynodes = [] 59 self.restarted = {} 60 61 # read config file 62 # conf_file = os.path.dirname(args[0]) + "/" + CONFIG_FILE 63 # conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/" + CONFIG_FILE 64 conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + CONFIG_FILE 65 config = ConfigParser.ConfigParser() 66 logging.debug ("read configfile: " + conf_file) 67 config.read(conf_file) 68 default_dict = config.defaults() 69 self.CPR = default_dict["cpr"] 70 self.SHELL=default_dict["shell"] 71 self.MPIRUN=default_dict["mpirun"] 72 logging.debug("cpr: " + self.CPR + " mpi: " + self.MPIRUN + " shell: " + self.SHELL) 73 74 # init rms (SGE/PBS) 75 self.init_rms() 76 77 self.failed_polls = 0 78 79 ############################################################################## 80 # initialization of coordination and communication subsystem 81 # Redis initialization 82 self.base_url = args[2] 83 logging.debug("BigJob Agent arguments: " + str(args)) 84 logging.debug("Initialize C&C subsystem to pilot-url: " + self.base_url) 85 86 87 if(self.coordination_url.startswith("advert://")): 88 try: 89 from coordination.bigjob_coordination_advert import bigjob_coordination 90 logging.debug("Utilizing ADVERT Backend: " + self.coordination_url) 91 except: 92 logging.error("Advert Backend could not be loaded") 93 elif (self.coordination_url.startswith("redis://")): 94 try: 95 from coordination.bigjob_coordination_redis import bigjob_coordination 96 logging.debug("Utilizing Redis Backend: " + self.coordination_url + ". Please make sure Redis server is configured in bigjob_coordination_redis.py") 97 except: 98 logging.error("Error loading pyredis.") 99 elif (self.coordination_url.startswith("tcp://")): 100 try: 101 from coordination.bigjob_coordination_zmq import bigjob_coordination 102 logging.debug("Utilizing ZMQ Backend") 103 except: 104 logging.error("ZMQ Backend not found. Please install ZeroMQ (http://www.zeromq.org/intro:get-the-software) and " 105 +"PYZMQ (http://zeromq.github.com/pyzmq/)") 106 107 self.coordination = bigjob_coordination(server_connect_url=self.coordination_url) 108 109 # update state of pilot job to running 110 self.coordination.set_pilot_state(self.base_url, str(bigjob.state.Running), False) 111 112 113 ############################################################################## 114 # start background thread for polling new jobs and monitoring current jobs 115 self.resource_lock=threading.RLock() 116 self.threadpool = ThreadPool(THREAD_POOL_SIZE) 117 118 self.launcher_thread=threading.Thread(target=self.dequeue_new_jobs) 119 self.launcher_thread.start() 120 121 self.monitoring_thread=threading.Thread(target=self.start_background_thread) 122 self.monitoring_thread.start()
123 124 125
126 - def init_rms(self):
127 if(os.environ.get("PBS_NODEFILE")!=None): 128 return self.init_pbs() 129 elif(os.environ.get("PE_HOSTFILE")!=None): 130 return self.init_sge() 131 else: 132 return self.init_local() 133 return None
134
135 - def init_local(self):
136 """ initialize free nodes list with dummy (for fork jobs)""" 137 try: 138 num_cpus = self.get_num_cpus() 139 for i in range(0, num_cpus): 140 self.freenodes.append("localhost\n") 141 except IOError: 142 self.freenodes=["localhost\n"]
143
144 - def init_sge(self):
145 """ initialize free nodes list from SGE environment """ 146 sge_node_file = os.environ.get("PE_HOSTFILE") 147 if sge_node_file == None: 148 return 149 f = open(sge_node_file) 150 sgenodes = f.readlines() 151 f.close() 152 for i in sgenodes: 153 154 columns = i.split() 155 try: 156 for j in range(0, int(columns[1])): 157 logging.debug("add host: " + columns[0].strip()) 158 self.freenodes.append(columns[0]+"\n") 159 except: 160 pass 161 return self.freenodes
162
163 - def init_pbs(self):
164 """ initialize free nodes list from PBS environment """ 165 pbs_node_file = os.environ.get("PBS_NODEFILE") 166 if pbs_node_file == None: 167 return 168 f = open(pbs_node_file) 169 self.freenodes = f.readlines() 170 f.close() 171 172 # check whether pbs node file contains the correct number of nodes 173 num_cpus = self.get_num_cpus() 174 node_dict={} 175 for i in set(self.freenodes): 176 node_dict[i] = self.freenodes.count(i) 177 if node_dict[i] < num_cpus: 178 node_dict[i] = num_cpus 179 180 self.freenodes=[] 181 for i in node_dict.keys(): 182 logging.debug("host: " + i + " nodes: " + str(node_dict[i])) 183 for j in range(0, node_dict[i]): 184 logging.debug("add host: " + i.strip()) 185 self.freenodes.append(i)
186
187 - def get_num_cpus(self):
188 cpuinfo = open("/proc/cpuinfo", "r") 189 cpus = cpuinfo.readlines() 190 cpuinfo.close() 191 num = 0 192 for i in cpus: 193 if i.startswith("processor"): 194 num = num+1 195 return num
196 197
198 - def execute_job(self, job_url, job_dict):
199 """ obtain job attributes from c&c and execute process """ 200 state=str(job_dict["state"]) 201 202 #try: 203 # state = self.coordination.get_job_state(job_url) 204 #except: 205 # logging.error("Could not access job state... skip execution attempt and requeuing job") 206 # result = self.coordination.queue_job(self.base_url, job_url) 207 # if result == False: 208 # self.coordination.set_job_state(job_url, str(saga.job.Failed)) 209 210 if(state==str(bigjob.state.Unknown) or 211 state==str(bigjob.state.New)): 212 try: 213 #job_dict["state"]=str(saga.job.New) 214 logging.debug("Start job: " + str(job_dict)) 215 numberofprocesses = "1" 216 if (job_dict.has_key("NumberOfProcesses") == True): 217 numberofprocesses = job_dict["NumberOfProcesses"] 218 219 spmdvariation="single" 220 if (job_dict.has_key("SPMDVariation") == True): 221 spmdvariation = job_dict["SPMDVariation"] 222 223 arguments = "" 224 if (job_dict.has_key("Arguments") == True): 225 arguments_raw = job_dict['Arguments']; 226 if type(arguments_raw) == types.ListType: 227 arguments_list = arguments_raw 228 else: 229 arguments_list = eval(job_dict["Arguments"]) 230 for i in arguments_list: 231 arguments = arguments + " " + i 232 233 workingdirectory = os.getcwd() 234 if (job_dict.has_key("WorkingDirectory") == True): 235 workingdirectory = job_dict["WorkingDirectory"] 236 237 environment = os.environ 238 if (job_dict.has_key("Environment") == True): 239 for i in job_dict["Environment"]: 240 env = i.split("=") 241 environment[env[0]]=env[1] + ":" + environment[env[0]] 242 243 environment["PATH"]= workingdirectory + ":"+environment["PATH"] 244 print "environment[PATH]", environment["PATH"] 245 executable = job_dict["Executable"] 246 247 248 output="stdout" 249 if (job_dict.has_key("Output") == True): 250 output = job_dict["Output"] 251 252 error="stderr" 253 if (job_dict.has_key("Error") == True): 254 error = job_dict["Error"] 255 256 # append job to job list 257 self.jobs.append(job_url) 258 259 # create stdout/stderr file descriptors 260 output_file = os.path.join(workingdirectory, output) 261 error_file = os.path.join(workingdirectory, error) 262 logging.debug("stdout: " + output_file + " stderr: " + error_file + " env: " + str(environment)) 263 stdout = open(output_file, "w") 264 stderr = open(error_file, "w") 265 #if not "/" in executable: 266 # command = workingdirectory +"/" +executable + " " + arguments 267 command = executable + " " + arguments 268 #pdb.set_trace() 269 # special setup for MPI NAMD jobs 270 machinefile = self.allocate_nodes(job_dict) 271 host = "localhost" 272 try: 273 machine_file_handler = open(machinefile, "r") 274 node= machine_file_handler.readlines() 275 machine_file_handler.close() 276 host = node[0].strip() 277 except: 278 pass 279 280 281 if(machinefile==None): 282 logging.debug("Not enough resources to run: " + job_url) 283 self.coordination.queue_job(self.base_url, job_url) 284 return # job cannot be run at the moment 285 286 # start application process 287 if (spmdvariation.lower( )=="mpi"): 288 command = "cd " + workingdirectory + "; " + self.MPIRUN + " -np " + numberofprocesses + " -machinefile " + machinefile + " " + command 289 #if (host != socket.gethostname()): 290 # command ="ssh " + host + " \"cd " + workingdirectory + "; " + command +"\"" 291 else: 292 command ="chmod +x " + executable +";export PATH=$PATH:" + workingdirectory + ";" +command 293 shell = self.SHELL 294 logging.debug("execute: " + command + " in " + workingdirectory + " from: " + str(socket.gethostname()) + " (Shell: " + shell +")") 295 # bash works fine for launching on QB but fails for Abe :-( 296 p = subprocess.Popen(args=command, executable=shell, stderr=stderr, 297 stdout=stdout, cwd=workingdirectory, 298 env=environment, shell=True) 299 logging.debug("started " + command) 300 301 #for condor debugging 302 dirlist = os.listdir(workingdirectory) 303 print dirlist 304 os.system("ls;pwd") 305 306 self.processes[job_url] = p 307 self.coordination.set_job_state(job_url, str(bigjob.state.Running)) 308 except: 309 traceback.print_exc(file=sys.stderr)
310 311
312 - def allocate_nodes(self, job_dict):
313 """ allocate nodes 314 allocated nodes will be written to machinefile advert-launcher-machines-<jobid> 315 """ 316 self.resource_lock.acquire() 317 number_nodes = int(job_dict["NumberOfProcesses"]) 318 nodes = [] 319 machine_file_name = None 320 if (len(self.freenodes)>=number_nodes): 321 unique_nodes=set(self.freenodes) 322 for i in unique_nodes: 323 number = self.freenodes.count(i) 324 logging.debug("allocate: " + i + " number nodes: " + str(number) 325 + " current busy nodes: " + str(self.busynodes) 326 + " free nodes: " + str(self.freenodes)) 327 for j in range(0, number): 328 if(number_nodes > 0): 329 nodes.append(i) 330 self.freenodes.remove(i) 331 self.busynodes.append(i) 332 number_nodes = number_nodes - 1 333 else: 334 break 335 336 machine_file_name = self.get_machine_file_name(job_dict) 337 machine_file = open(machine_file_name, "w") 338 #machine_file.writelines(self.freenodes[:number_nodes]) 339 machine_file.writelines(nodes) 340 machine_file.close() 341 logging.debug("wrote machinefile: " + machine_file_name + " Nodes: " + str(nodes)) 342 # update node structures 343 #self.busynodes.extend(self.freenodes[:number_nodes]) 344 #del(self.freenodes[:number_nodes]) 345 346 self.resource_lock.release() 347 return machine_file_name
348 349 350
351 - def setup_charmpp_nodefile(self, allocated_nodes):
352 """ Setup charm++ nodefile to use for executing NAMD 353 HACK!! Method violates layering principle 354 File $HOME/machinefile in charm++ nodefileformat is written to first node in list 355 """ 356 # Nodelist format: 357 # 358 # host tp-x001 ++cpus 2 ++shell ssh 359 # host tp-x002 ++cpus 2 ++shell ssh 360 361 nodefile_string="" 362 for i in allocated_nodes: 363 if i.has_key("private_hostname"): 364 nodefile_string=nodefile_string + "host "+ i["private_hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n" 365 else: 366 nodefile_string=nodefile_string + "host "+ i["hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n" 367 368 # copy nodefile to rank 0 node 369 jd = saga.job.description() 370 jd.executable = "echo" 371 jd.number_of_processes = "1" 372 jd.spmd_variation = "single" 373 # ssh root@tp-x001.ci.uchicago.edu "cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys" 374 jd.arguments = ["\""+nodefile_string+"\"", ">", "machinefile"] 375 jd.output = "stdout.txt" 376 jd.error = "stderr.txt" 377 378 job_service_url = saga.url("ssh://root@"+allocated_nodes[0]["hostname"]) 379 job_service = saga.job.service(self.session, job_service_url) 380 job = job_service.create_job(jd) 381 job.run() 382 job.wait()
383
384 - def print_machine_file(self, filename):
385 fh = open(filename, "r") 386 lines = fh.readlines() 387 fh.close 388 logging.debug("Machinefile: " + filename + " Hosts: " + str(lines))
389
390 - def free_nodes(self, job_url):
391 job_dict = self.coordination.get_job(job_url) 392 self.resource_lock.acquire() 393 number_nodes = int(job_dict["NumberOfProcesses"]) 394 machine_file_name = self.get_machine_file_name(job_dict) 395 logging.debug("Machine file: " + machine_file_name) 396 allocated_nodes = ["localhost\n"] 397 try: 398 machine_file = open(machine_file_name, "r") 399 allocated_nodes = machine_file.readlines() 400 machine_file.close() 401 except: 402 traceback.print_exc(file=sys.stderr) 403 404 logging.debug("Free nodes: " + str(allocated_nodes)) 405 406 for i in allocated_nodes: 407 logging.debug("free node: " + str(i) + " current busy nodes: " + str(self.busynodes) 408 + " free nodes: " + str(self.freenodes)) 409 self.busynodes.remove(i) 410 self.freenodes.append(i) 411 logging.debug("Delete " + machine_file_name) 412 if os.path.exists(machine_file_name): 413 os.remove(machine_file_name) 414 self.resource_lock.release()
415 416
417 - def get_machine_file_name(self, job_dict):
418 """create machinefile based on jobid""" 419 job_id = job_dict["job-id"] 420 homedir = os.path.expanduser('~') 421 return homedir + "/advert-launcher-machines-"+ job_id
422
423 - def dequeue_new_jobs(self):
424 """Subscribe to new jobs from Redis. """ 425 job_counter = 0 426 while self.is_stopped(self.base_url)==False: 427 if len(self.freenodes)==0: 428 time.sleep(3) 429 continue 430 logging.debug("Dequeue sub-job from: " + self.base_url) 431 job_url=self.coordination.dequeue_job(self.base_url) 432 if job_url==None: 433 time.sleep(3) 434 continue 435 if job_url=="STOP": 436 break 437 438 job_counter = job_counter + 1 439 if (job_counter % (THREAD_POOL_SIZE))==0: # ensure that threadpool is not too overloaded 440 self.threadpool.wait() 441 442 request = WorkRequest(self.start_new_job_in_thread, [job_url]) 443 self.threadpool.putRequest(request) 444 445 # wait for termination of Worker Threads 446 self.threadpool.wait() 447 logging.debug("Terminating Agent - Dequeue Sub-Jobs Thread")
448 449 #def poll_jobs(self): 450 # self.threadpool.wait() 451 # new_jobs=self.redis.keys(self.base_url+":*") 452 # logging.debug("All jobs:" + str(new_jobs)) 453 # for i in new_jobs: 454 # request = WorkRequest(self.start_new_job_in_thread, [str(i)]) 455 # logging.debug("WorkRequest: " + str(request)) 456 # self.threadpool.putRequest(request) 457
458 - def start_new_job_in_thread(self, job_url):
459 """evaluates job dir, sanity checks, executes job """ 460 #pdb.set_trace() 461 if job_url != None: 462 failed = False; 463 try: 464 job_dict = self.coordination.get_job(job_url) 465 except: 466 failed=True 467 468 if job_dict==None or failed==True: 469 self.coordination.queue_job(self.pilot_url, job_url) 470 471 logging.debug("start job: " + job_url + " data: " + str(job_dict)) 472 if(job_dict["state"]==str(bigjob.state.Unknown)): 473 job_dict["state"]=str(bigjob.state.New) 474 self.coordination.set_job_state(job_url, str(bigjob.state.New)) 475 self.execute_job(job_url, job_dict)
476 #print "Execute: " + str(job_dict) 477
478 - def monitor_jobs(self):
479 """Monitor running processes. """ 480 #pdb.set_trace() 481 logging.debug("Monitor jobs - # current jobs: %d"%len(self.jobs)) 482 for i in self.jobs: 483 if self.processes.has_key(i): # only if job has already been starteds 484 p = self.processes[i] 485 p_state = p.poll() 486 logging.debug(self.print_job(i) + " state: " + str(p_state) + " return code: " + str(p.returncode)) 487 if (p_state != None and (p_state==0 or p_state==255)): 488 logging.debug("Job successful: " + self.print_job(i)) 489 self.coordination.set_job_state(i, str(bigjob.state.Done)) 490 #i.set_attribute("state", str(saga.job.Done)) 491 self.free_nodes(i) 492 del self.processes[i] 493 elif p_state!=0 and p_state!=255 and p_state != None: 494 logging.debug(self.print_job(i) + " failed. ") 495 # do not free nodes => very likely the job will fail on these nodes 496 # self.free_nodes(i) 497 #if self.restarted.has_key(i)==False: 498 # logging.debug("Try to restart job " + self.print_job(i)) 499 # self.restarted[i]=True 500 # self.execute_job(i) 501 #else: 502 logging.debug("Job failed " + self.print_job(i)) 503 self.coordination.set_job_state(i, str(bigjob.state.Failed)) 504 self.free_nodes(i) 505 del self.processes[i]
506
507 - def print_job(self, job_url):
508 job_dict = self.coordination.get_job(job_url) 509 return ("Job: " + job_url 510 + " Excutable: " + job_dict["Executable"])
511 512
513 - def start_background_thread(self):
514 self.stop=False 515 logging.debug("##################################### New POLL/MONITOR cycle ##################################") 516 logging.debug("Free nodes: " + str(len(self.freenodes)) + " Busy Nodes: " + str(len(self.busynodes))) 517 while True and self.stop==False: 518 if self.is_stopped(self.base_url)==True: 519 logging.debug("Pilot job entry deleted - terminate agent") 520 break 521 else: 522 logging.debug("Pilot job entry: " + str(self.base_url) + " exists. Pilot job not in state stopped.") 523 try: 524 #self.poll_jobs() 525 self.monitor_jobs() 526 time.sleep(5) 527 self.failed_polls=0 528 except: 529 traceback.print_exc(file=sys.stdout) 530 self.failed_polls=self.failed_polls+1 531 if self.failed_polls>3: # after 3 failed attempts exit 532 break 533 logging.debug("Terminating Agent - Background Thread") 534 535
536 - def is_stopped(self, base_url):
537 state = None 538 try: 539 state = self.coordination.get_pilot_state(base_url) 540 except: 541 pass 542 logging.debug("Pilot State: " + str(state)) 543 if state==None or state.has_key("stopped")==False or state["stopped"]==True: 544 return True 545 else: 546 return False
547 548
549 - def stop_background_thread(self):
550 self.stop=True 551 552 553 ######################################################### 554 # main # 555 ######################################################### 556 if __name__ == "__main__" : 557 args = sys.argv 558 num_args = len(args) 559 if (num_args!=3): 560 print "Usage: \n " + args[0] + " <coordination host url> <coordination namespace url>" 561 sys.exit(1) 562 563 bigjob_agent = bigjob_agent(args) 564