Package bigjob :: Module bigjob_manager
[hide private]
[frames] | no frames]

Source Code for Module bigjob.bigjob_manager

  1  #!/usr/bin/env python 
  2   
  3  """Module bigjob_manager. 
  4   
  5  This Module is used to launch jobs via the advert service.  
  6   
  7  It assumes that an bigjob_agent.py is available on the remote machine. 
  8  bigjob_agent.py will poll the advert service for new jobs and run these jobs on the respective 
  9  machine . 
 10   
 11  Background: This approach avoids queueing delays since the igjob_agent_launcher.py must be just started via saga.job or saga.cpr 
 12  once. All shortrunning task will be started using the protocol implemented by subjob() and bigjob_agent.py 
 13   
 14  Installation: 
 15  Set environment variable BIGJOB_HOME to installation directory 
 16  """ 
 17   
 18  import sys 
 19  from bigjob import logger 
 20  import time 
 21  import os 
 22  import traceback 
 23  import logging 
 24  import textwrap 
 25  import urlparse 
 26   
 27  try: 
 28      import paramiko 
 29  except: 
 30      logger.warn("Paramiko not found. Without Paramiko file staging is not supported!") 
 31   
 32  from bigjob import SAGA_BLISS  
 33  from bigjob.state import Running, New, Failed, Done, Unknown 
 34   
 35  if SAGA_BLISS == False: 
 36      try: 
 37          import saga 
 38          logger.debug("Using SAGA C++/Python.") 
 39          is_bliss=False 
 40      except: 
 41          logger.error("SAGA C++ and Python bindings not found. Using Bliss.") 
 42          import bliss.sagacompat as saga 
 43          is_bliss=True 
 44  else: 
 45      logger.debug("Using SAGA Bliss.") 
 46      import bliss.sagacompat as saga 
 47      is_bliss=True  
 48   
 49   
 50  # import other BigJob packages 
 51  # import API 
 52  import api.base 
 53  sys.path.append(os.path.dirname(__file__)) 
 54   
 55  from pbsssh import pbsssh 
 56   
 57  if sys.version_info < (2, 5): 
 58      sys.path.append(os.path.dirname( __file__ ) + "/ext/uuid-1.30/") 
 59      sys.stderr.write("Warning: Using unsupported Python version\n") 
 60  if sys.version_info < (2, 4): 
 61      sys.path.append(os.path.dirname( __file__ ) + "/ext/subprocess-2.6.4/") 
 62      sys.stderr.write("Warning: Using unsupported Python version\n") 
 63  if sys.version_info < (2, 3): 
 64      sys.stderr.write("Error: Python versions <2.3 not supported\n") 
 65      sys.exit(-1) 
 66   
 67  import uuid 
 68   
69 -def get_uuid():
70 wd_uuid="" 71 wd_uuid += str(uuid.uuid1()) 72 return wd_uuid
73 74 75 """ Config parameters (will move to config file in future) """ 76 CLEANUP=True 77 78 #for legacy purposes and support for old BJ API 79 pilot_url_dict={} # stores a mapping of pilot_url to bigjob 80 81 82
83 -class BigJobError(Exception):
84 - def __init__(self, value):
85 self.value = value
86
87 - def __str__(self):
88 return repr(self.value)
89 90
91 -class bigjob(api.base.bigjob):
92
93 - def __init__(self, coordination_url="advert://localhost/"):
94 """ Initializes BigJob's coordination system 95 e.g.: 96 advert://localhost (SAGA/Advert SQLITE) 97 advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL) 98 redis://localhost:6379 (Redis at localhost) 99 tcp://localhost (ZMQ) 100 """ 101 102 self.uuid = "bj-" + str(get_uuid()) 103 104 logger.debug("init BigJob w/: " + coordination_url) 105 self.coordination_url = coordination_url 106 self.coordination = self.__init_coordination(coordination_url) 107 __APPLICATION_NAME="bigjob" 108 self.app_url = __APPLICATION_NAME +":" + str(self.uuid) 109 110 self.state=Unknown 111 self.pilot_url="" 112 self.job = None 113 self.working_directory = None 114 logger.debug("initialized BigJob: " + self.app_url)
115 116
117 - def __init_coordination(self, coordination_url):
118 if(coordination_url.startswith("advert://") or coordination_url.startswith("sqlasyncadvert://")): 119 try: 120 from coordination.bigjob_coordination_advert import bigjob_coordination 121 logger.debug("Utilizing ADVERT Backend") 122 except: 123 logger.error("Advert Backend could not be loaded") 124 elif (coordination_url.startswith("redis://")): 125 try: 126 from coordination.bigjob_coordination_redis import bigjob_coordination 127 logger.debug("Utilizing Redis Backend") 128 except: 129 logger.error("Error loading pyredis.") 130 elif (coordination_url.startswith("tcp://")): 131 try: 132 from coordination.bigjob_coordination_zmq import bigjob_coordination 133 logger.debug("Utilizing ZMQ Backend") 134 except: 135 logger.error("ZMQ Backend not found. Please install ZeroMQ (http://www.zeromq.org/intro:get-the-software) and " 136 +"PYZMQ (http://zeromq.github.com/pyzmq/)") 137 else: 138 logger.error("No suitable coordination backend found.") 139 140 logger.debug("Parsing URL: " + coordination_url) 141 scheme, username, password, host, port, dbtype = self.__parse_url(coordination_url) 142 143 if port == -1: 144 port = None 145 coordination = bigjob_coordination(server=host, server_port=port, username=username, 146 password=password, dbtype=dbtype, url_prefix=scheme) 147 return coordination 148 149 150
151 - def start_pilot_job(self, 152 lrms_url, 153 bigjob_agent_executable=None, 154 number_nodes=1, 155 queue=None, 156 project=None, 157 working_directory=None, 158 userproxy=None, 159 walltime=None, 160 processes_per_node=1, 161 filetransfers=None):
162 """ Start a batch job (using SAGA Job API) at resource manager. Currently, the following resource manager are supported: 163 fork://localhost/ (Default Job Adaptor 164 gram://qb1.loni.org/jobmanager-pbs (Globus Adaptor) 165 pbspro://localhost (PBS Prop Adaptor) 166 167 """ 168 169 if self.job != None: 170 raise BigJobError("One BigJob already active. Please stop BigJob first.") 171 return 172 173 ############################################################################## 174 # initialization of coordination and communication subsystem 175 # Communication & Coordination initialization 176 lrms_saga_url = saga.url(lrms_url) 177 self.pilot_url = self.app_url + ":" + lrms_saga_url.host 178 pilot_url_dict[self.pilot_url]=self 179 180 logger.debug("create pilot job entry on backend server: " + self.pilot_url) 181 self.coordination.set_pilot_state(self.pilot_url, str(Unknown), False) 182 183 logger.debug("set pilot state to: " + str(Unknown)) 184 ############################################################################## 185 186 self.number_nodes=int(number_nodes) 187 188 # create job description 189 jd = saga.job.description() 190 191 192 logger.debug("Adaptor specific modifications: " + str(lrms_saga_url.scheme)) 193 if lrms_saga_url.scheme == "condorg": 194 jd.arguments = [ "-a", self.coordination.get_address(), "-b",self.pilot_url] 195 logger.debug("\n\n-a", self.coordination.get_address(),"-b", self.pilot_url) 196 agent_exe = os.path.abspath(os.path.join(os.getcwd(),"..","bootstrap","bigjob-condor-bootstrap.py")) 197 logger.debug(agent_exe) 198 jd.executable = agent_exe 199 200 else: 201 bootstrap_script = self.generate_bootstrap_script(self.coordination.get_address(), self.pilot_url) 202 if lrms_saga_url.scheme == "gram": 203 bootstrap_script = self.escape_rsl(bootstrap_script) 204 elif lrms_saga_url.scheme == "pbspro": 205 bootstrap_script = self.escape_pbs(bootstrap_script) 206 elif lrms_saga_url.scheme == "ssh": 207 bootstrap_script = self.escape_ssh(bootstrap_script) 208 ############ submit pbs script which launches bigjob agent using ssh adaptors########## 209 elif lrms_saga_url.scheme == "pbs-ssh": 210 # change the url scheme ssh to use ssh adaptors to launch job 211 bootstrap_script = self.escape_ssh(bootstrap_script) 212 ### convert walltime in minutes to PBS representation of time ### 213 hrs=walltime/60 214 minu=walltime%60 215 walltimepbs=""+str(hrs)+":"+str(minu)+":00" 216 if number_nodes%processes_per_node == 0: 217 number_nodes = number_nodes/processes_per_node 218 else: 219 number_nodes = ( number_nodes/processes_per_node) + 1 220 pbssshj = pbsssh(bootstrap_script,lrms_saga_url, walltimepbs,number_nodes,processes_per_node,userproxy,working_directory) 221 self.job = pbssshj 222 self.job.run() 223 return 224 elif is_bliss: 225 bootstrap_script = self.escape_bliss(bootstrap_script) 226 227 #logger.debug(bootstrap_script) 228 if is_bliss==False: 229 jd.number_of_processes = str(number_nodes) 230 jd.processes_per_host=str(processes_per_node) 231 else: 232 jd.TotalCPUCount=str(int(number_nodes)*int(processes_per_node)) 233 234 jd.spmd_variation = "single" 235 #jd.arguments = [bigjob_agent_executable, self.coordination.get_address(), self.pilot_url] 236 jd.arguments = ["-c", bootstrap_script] 237 jd.executable = "python" 238 if queue != None: 239 jd.queue = queue 240 if project !=None: 241 jd.job_project = [project] 242 if walltime!=None: 243 jd.wall_time_limit=str(walltime) 244 245 # XXX Isn't the working directory about the remote site? 246 if working_directory != None: 247 if not os.path.isdir(working_directory) and lrms_saga_url.scheme=="fork": 248 os.mkdir(working_directory) 249 self.working_directory = working_directory 250 else: 251 self.working_directory = os.path.expanduser("~") 252 253 jd.working_directory = self.working_directory 254 255 logger.debug("Working directory: " + jd.working_directory) 256 jd.output = os.path.join(self.__get_bigjob_working_dir(), "stdout-bigjob_agent.txt") 257 jd.error = os.path.join(self.__get_bigjob_working_dir(),"stderr-bigjob_agent.txt") 258 259 # Stage BJ Input files 260 # build target url 261 bigjob_working_directory_url = "ssh://" + lrms_saga_url.host + self.__get_bigjob_working_dir() 262 self.__stage_files(filetransfers, bigjob_working_directory_url) 263 264 # Submit job 265 js = None 266 if userproxy != None and userproxy != '': 267 s = saga.session() 268 os.environ["X509_USER_PROXY"]=userproxy 269 ctx = saga.context("x509") 270 ctx.set_attribute ("UserProxy", userproxy) 271 s.add_context(ctx) 272 logger.debug("use proxy: " + userproxy) 273 js = saga.job.service(s, lrms_saga_url) 274 else: 275 logger.debug("use standard proxy") 276 js = saga.job.service(lrms_saga_url) 277 278 self.job = js.create_job(jd) 279 logger.debug("Submit pilot job to: " + str(lrms_saga_url)) 280 self.job.run()
281 #return self.job 282
283 - def generate_bootstrap_script(self, coordination_host, coordination_namespace):
284 script = textwrap.dedent("""import sys 285 import os 286 import urllib 287 import sys 288 import time 289 290 start_time = time.time() 291 292 home = os.environ["HOME"] 293 294 BIGJOB_AGENT_DIR= os.path.join(home, ".bigjob") 295 if not os.path.exists(BIGJOB_AGENT_DIR): os.mkdir (BIGJOB_AGENT_DIR) 296 BIGJOB_PYTHON_DIR=BIGJOB_AGENT_DIR+"/python/" 297 BOOTSTRAP_URL="https://svn.cct.lsu.edu/repos/saga-projects/applications/bigjob/trunk/generic/bootstrap/bigjob-bootstrap.py" 298 BOOTSTRAP_FILE=BIGJOB_AGENT_DIR+"/bigjob-bootstrap.py" 299 300 try: import saga 301 except: print "SAGA and SAGA Python Bindings not found: BigJob only work w/ non-SAGA backends e.g. Redis, ZMQ.";print "Python version: ", os.system("python -V");print "Python path: " + str(sys.path) 302 303 sys.path.insert(0, os.getcwd() + "/../") 304 sys.path.insert(0, os.getcwd() + "/../../") 305 306 try: import bigjob.bigjob_agent 307 except: print "BigJob not installed. Attempting to install it."; opener = urllib.FancyURLopener({}); opener.retrieve(BOOTSTRAP_URL, BOOTSTRAP_FILE); os.system("python " + BOOTSTRAP_FILE + " " + BIGJOB_PYTHON_DIR); activate_this = BIGJOB_PYTHON_DIR+'bin/activate_this.py'; execfile(activate_this, dict(__file__=activate_this)) 308 309 #try to import BJ once again 310 import bigjob.bigjob_agent 311 312 # execute bj agent 313 args = ["bigjob_agent.py", \"%s\", \"%s\"] 314 print "Bootstrap time: " + str(time.time()-start_time) 315 print "Starting BigJob Agents with following args: " + str(args) 316 bigjob_agent = bigjob.bigjob_agent.bigjob_agent(args) 317 """ % (coordination_host, coordination_namespace)) 318 return script
319
320 - def escape_rsl(self, bootstrap_script):
321 logger.debug("Escape RSL") 322 bootstrap_script = bootstrap_script.replace("\"", "\"\"") 323 return bootstrap_script
324 325
326 - def escape_pbs(self, bootstrap_script):
327 logger.debug("Escape PBS") 328 bootstrap_script = "\'" + bootstrap_script+ "\'" 329 return bootstrap_script
330 331
332 - def escape_ssh(self, bootstrap_script):
333 logger.debug("Escape SSH") 334 bootstrap_script = bootstrap_script.replace("\"", "\\\"") 335 bootstrap_script = bootstrap_script.replace("\'", "\\\"") 336 bootstrap_script = "\"" + bootstrap_script+ "\"" 337 return bootstrap_script
338
339 - def escape_bliss(self, bootstrap_script):
340 logger.debug("Escape fork") 341 #bootstrap_script = bootstrap_script.replace("\"", "\\\"") 342 bootstrap_script = bootstrap_script.replace("\'", "\"") 343 bootstrap_script = "\'" + bootstrap_script+ "\'" 344 return bootstrap_script
345 346
347 - def add_subjob(self, jd, job_url, job_id):
348 logger.debug("Stage input files for sub-job") 349 if jd.attribute_exists ("filetransfer"): 350 try: 351 self.__stage_files(jd.filetransfer, self.__get_subjob_working_dir(job_id)) 352 except: 353 logger.error("File Stagein failed. Is Paramiko installed?") 354 logger.debug("add subjob to queue of PJ: " + str(self.pilot_url)) 355 for i in range(0,3): 356 try: 357 logger.debug("create dictionary for job description. Job-URL: " + job_url) 358 # put job description attributes to Redis 359 job_dict = {} 360 #to accomendate current bug in bliss (Number of processes is not returned from list attributes) 361 job_dict["NumberOfProcesses"] = "1" 362 attributes = jd.list_attributes() 363 logger.debug("SJ Attributes: " + str(attributes)) 364 for i in attributes: 365 if jd.attribute_is_vector(i): 366 #logger.debug("Add attribute: " + str(i) + " Value: " + str(jd.get_vector_attribute(i))) 367 vector_attr = [] 368 for j in jd.get_vector_attribute(i): 369 vector_attr.append(j) 370 job_dict[i]=vector_attr 371 else: 372 #logger.debug("Add attribute: " + str(i) + " Value: " + jd.get_attribute(i)) 373 job_dict[i] = jd.get_attribute(i) 374 375 job_dict["state"] = str(Unknown) 376 job_dict["job-id"] = str(job_id) 377 378 #logger.debug("update job description at communication & coordination sub-system") 379 self.coordination.set_job(job_url, job_dict) 380 self.coordination.queue_job(self.pilot_url, job_url) 381 break 382 except: 383 traceback.print_exc(file=sys.stdout) 384 time.sleep(2)
385 #raise Exception("Unable to submit job") 386
387 - def delete_subjob(self, job_url):
388 self.coordination.delete_job(job_url)
389
390 - def get_subjob_state(self, job_url):
391 return self.coordination.get_job_state(job_url)
392
393 - def get_subjob_details(self, job_url):
394 return self.coordination.get_job(job_url)
395
396 - def get_state(self):
397 """ duck typing for get_state of saga.job.job 398 state of saga job that is used to spawn the pilot agent 399 """ 400 try: 401 return self.job.get_state() 402 except: 403 return None 404
405 - def get_state_detail(self):
406 """ internal state of BigJob agent """ 407 try: 408 return self.coordination.get_pilot_state(self.pilot_url)["state"] 409 except: 410 return None 411
412 - def get_free_nodes(self):
413 jobs = self.coordination.get_jobs_of_pilot(self.pilot_url) 414 number_used_nodes=0 415 for i in jobs: 416 job_detail = self.coordination.get_job(i) 417 if job_detail != None and job_detail.has_key("state") == True\ 418 and job_detail["state"]==str(Running): 419 job_np = "1" 420 if (job_detail["NumberOfProcesses"] == True): 421 job_np = job_detail["NumberOfProcesses"] 422 number_used_nodes=number_used_nodes + int(job_np) 423 return (self.number_nodes - number_used_nodes)
424 425
426 - def stop_pilot_job(self):
427 """ mark in advert directory of pilot-job as stopped """ 428 try: 429 logger.debug("stop pilot job: " + self.pilot_url) 430 self.coordination.set_pilot_state(self.pilot_url, str(Done), True) 431 self.job=None 432 except: 433 pass
434
435 - def cancel(self):
436 """ duck typing for cancel of saga.cpr.job and saga.job.job """ 437 logger.debug("Cancel Pilot Job") 438 try: 439 self.job.cancel() 440 except: 441 pass 442 #traceback.print_stack() 443 try: 444 self.stop_pilot_job() 445 logger.debug("delete pilot job: " + str(self.pilot_url)) 446 if CLEANUP: 447 self.coordination.delete_pilot(self.pilot_url) 448 except: 449 pass 450 #traceback.print_stack() 451 452 453 ########################################################################### 454 # internal methods 455
456 - def __parse_url(self, url):
457 try: 458 surl = saga.url(url) 459 host = surl.host 460 port = surl.port 461 username = surl.username 462 password = surl.password 463 query = surl.query 464 scheme = "%s://"%surl.scheme 465 except: 466 """ Fallback URL parser based on Python urlparse library """ 467 logger.error("URL %s could not be parsed") 468 traceback.print_exc(file=sys.stderr) 469 result = urlparse.urlparse(url) 470 host = result.hostname 471 port = result.port 472 username = result.username 473 password = result.password 474 if url.find("?")>0: 475 query = url[url.find("?")+1:] 476 else: 477 query = None 478 scheme = "%s://"%result.scheme 479 480 return scheme, username, password, host, port, query
481 482
483 - def __get_bigjob_working_dir(self):
484 return os.path.join(self.working_directory, self.uuid)
485 486
487 - def __get_subjob_working_dir(self, sj_id):
488 return os.path.join(self.__get_bigjob_working_dir(), sj_id)
489 490
491 - def __stage_files(self, filetransfers, target_url):
492 logger.debug("Stage: %s to %s"%(filetransfers, target_url)) 493 self.__create_remote_directory(target_url) 494 if filetransfers==None: 495 return 496 for i in filetransfers: 497 source_file=i 498 if i.find(">")>0: 499 source_file = i[:i.find(">")].strip() 500 target_url_full = os.path.join(target_url, os.path.basename(source_file)) 501 logger.debug("Stage: %s to %s"%(source_file, target_url_full)) 502 self.__third_party_transfer(source_file, target_url_full)
503 504
505 - def __third_party_transfer(self, source_url, target_url):
506 """ 507 Transfers from source URL to machine of PS (target path) 508 """ 509 result = urlparse.urlparse(source_url) 510 source_host = result.netloc 511 source_path = result.path 512 513 result = urlparse.urlparse(target_url) 514 target_host = result.netloc 515 target_path = result.path 516 517 python_script= """import sys 518 import os 519 import urllib 520 import sys 521 import time 522 import paramiko 523 524 client = paramiko.SSHClient() 525 client.load_system_host_keys() 526 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 527 client.connect("%s") 528 sftp = client.open_sftp() 529 sftp.put("%s", "%s") 530 """%(target_host, source_path, target_path) 531 532 logging.debug("Execute: \n%s"%python_script) 533 source_client = paramiko.SSHClient() 534 source_client.load_system_host_keys() 535 source_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 536 source_client.connect(source_host) 537 stdin, stdout, stderr = source_client.exec_command("python -c \'%s\'"%python_script) 538 stdin.close() 539 logging.debug("************************************************") 540 logging.debug("Stdout: %s\nStderr:%s", stdout.read(), stderr.read()) 541 logging.debug("************************************************")
542 543
544 - def __create_remote_directory(self, target_url):
545 #result = urlparse.urlparse(target_url) 546 #target_host = result.netloc 547 #target_path = result.path 548 549 # Python 2.6 compatible URL parsing 550 scheme = target_url[:target_url.find("://")+3] 551 target_host = target_url[len(scheme):target_url.find("/", len(scheme))] 552 target_path = target_url[len(scheme)+len(target_host):] 553 if target_host == "localhost": 554 os.makedirs(target_path) 555 else: 556 try: 557 client = paramiko.SSHClient() 558 client.load_system_host_keys() 559 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 560 client.connect(target_host) 561 sftp = client.open_sftp() 562 sftp.mkdir(target_path) 563 sftp.close() 564 client.close() 565 except: 566 logger.warn("Error creating directory: " + str(target_path) 567 + " at: " + str(target_host) + " Already exists?" )
568 569 570
571 - def __print_traceback(self):
572 exc_type, exc_value, exc_traceback = sys.exc_info() 573 print "*** print_exception:" 574 traceback.print_exception(exc_type, exc_value, exc_traceback, 575 limit=2, file=sys.stdout)
576
577 - def __repr__(self):
578 return self.pilot_url
579
580 - def __del__(self):
581 self.cancel()
582 583 584 585
586 -class subjob(api.base.subjob):
587
588 - def __init__(self, coordination_url=None):
589 """Constructor""" 590 self.coordination_url = coordination_url 591 self.job_url=None 592 self.uuid = "sj-" + str(get_uuid()) 593 self.job_url = None 594 self.pilot_url = None 595 self.bj = None
596
597 - def get_job_url(self, pilot_url):
598 self.job_url = pilot_url + ":jobs:" + str(self.uuid) 599 return self.job_url
600 601
602 - def submit_job(self, pilot_url, jd):
603 """ submit subjob to referenced bigjob """ 604 if self.job_url==None: 605 self.job_url=self.get_job_url(pilot_url) 606 607 if self.pilot_url==None: 608 self.pilot_url = pilot_url 609 self.bj=pilot_url_dict[pilot_url] 610 self.bj.add_subjob(jd, self.job_url, self.uuid)
611 612
613 - def get_state(self, pilot_url=None):
614 """ duck typing for saga.job """ 615 if self.pilot_url==None: 616 self.pilot_url = pilot_url 617 self.bj=pilot_url_dict[pilot_url] 618 return self.bj.get_subjob_state(self.job_url)
619 620
621 - def cancel(self, pilot_url=None):
622 logger.debug("delete job: " + self.job_url) 623 if self.pilot_url==None: 624 self.pilot_url = pilot_url 625 self.bj=pilot_url_dict[pilot_url] 626 if str(self.bj.get_state())=="Running": 627 self.bj.delete_subjob(self.job_url)
628
629 - def get_exe(self, pilot_url=None):
630 if self.pilot_url==None: 631 self.pilot_url = pilot_url 632 self.bj=pilot_url_dict[pilot_url] 633 sj = self.bj.get_subjob_details(self.job_url) 634 return sj["Executable"]
635
636 - def get_arguments(self, pilot_url=None):
637 if self.pilot_url==None: 638 self.pilot_url = pilot_url 639 self.bj=pilot_url_dict[pilot_url] 640 sj = self.bj.get_subjob_details(self.job_url) 641 #logger.debug("Subjob details: " + str(sj)) 642 arguments="" 643 for i in sj["Arguments"]: 644 arguments = arguments + " " + i 645 return arguments
646
647 - def __del__(self):
648 self.cancel()
649
650 - def __repr__(self):
651 if(self.job_url==None): 652 return "None" 653 else: 654 return self.job_url 655 656
657 -class description(saga.job.description):
658 pass
659