1
2
3 """Module bigjob_manager.
4
5 This Module is used to launch jobs via the advert service.
6
7 It assumes that an bigjob_agent.py is available on the remote machine.
8 bigjob_agent.py will poll the advert service for new jobs and run these jobs on the respective
9 machine .
10
11 Background: This approach avoids queueing delays since the igjob_agent_launcher.py must be just started via saga.job or saga.cpr
12 once. All shortrunning task will be started using the protocol implemented by subjob() and bigjob_agent.py
13
14 Installation:
15 Set environment variable BIGJOB_HOME to installation directory
16 """
17
18 import sys
19 from bigjob import logger
20 import time
21 import os
22 import traceback
23 import logging
24 import textwrap
25 import urlparse
26
27 try:
28 import paramiko
29 except:
30 logger.warn("Paramiko not found. Without Paramiko file staging is not supported!")
31
32 from bigjob import SAGA_BLISS
33 from bigjob.state import Running, New, Failed, Done, Unknown
34
35 if SAGA_BLISS == False:
36 try:
37 import saga
38 logger.debug("Using SAGA C++/Python.")
39 is_bliss=False
40 except:
41 logger.error("SAGA C++ and Python bindings not found. Using Bliss.")
42 import bliss.sagacompat as saga
43 is_bliss=True
44 else:
45 logger.debug("Using SAGA Bliss.")
46 import bliss.sagacompat as saga
47 is_bliss=True
48
49
50
51
52 import api.base
53 sys.path.append(os.path.dirname(__file__))
54
55 from pbsssh import pbsssh
56
57 if sys.version_info < (2, 5):
58 sys.path.append(os.path.dirname( __file__ ) + "/ext/uuid-1.30/")
59 sys.stderr.write("Warning: Using unsupported Python version\n")
60 if sys.version_info < (2, 4):
61 sys.path.append(os.path.dirname( __file__ ) + "/ext/subprocess-2.6.4/")
62 sys.stderr.write("Warning: Using unsupported Python version\n")
63 if sys.version_info < (2, 3):
64 sys.stderr.write("Error: Python versions <2.3 not supported\n")
65 sys.exit(-1)
66
67 import uuid
68
70 wd_uuid=""
71 wd_uuid += str(uuid.uuid1())
72 return wd_uuid
73
74
75 """ Config parameters (will move to config file in future) """
76 CLEANUP=True
77
78
79 pilot_url_dict={}
80
81
82
86
88 return repr(self.value)
89
90
92
93 - def __init__(self, coordination_url="advert://localhost/"):
94 """ Initializes BigJob's coordination system
95 e.g.:
96 advert://localhost (SAGA/Advert SQLITE)
97 advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL)
98 redis://localhost:6379 (Redis at localhost)
99 tcp://localhost (ZMQ)
100 """
101
102 self.uuid = "bj-" + str(get_uuid())
103
104 logger.debug("init BigJob w/: " + coordination_url)
105 self.coordination_url = coordination_url
106 self.coordination = self.__init_coordination(coordination_url)
107 __APPLICATION_NAME="bigjob"
108 self.app_url = __APPLICATION_NAME +":" + str(self.uuid)
109
110 self.state=Unknown
111 self.pilot_url=""
112 self.job = None
113 self.working_directory = None
114 logger.debug("initialized BigJob: " + self.app_url)
115
116
118 if(coordination_url.startswith("advert://") or coordination_url.startswith("sqlasyncadvert://")):
119 try:
120 from coordination.bigjob_coordination_advert import bigjob_coordination
121 logger.debug("Utilizing ADVERT Backend")
122 except:
123 logger.error("Advert Backend could not be loaded")
124 elif (coordination_url.startswith("redis://")):
125 try:
126 from coordination.bigjob_coordination_redis import bigjob_coordination
127 logger.debug("Utilizing Redis Backend")
128 except:
129 logger.error("Error loading pyredis.")
130 elif (coordination_url.startswith("tcp://")):
131 try:
132 from coordination.bigjob_coordination_zmq import bigjob_coordination
133 logger.debug("Utilizing ZMQ Backend")
134 except:
135 logger.error("ZMQ Backend not found. Please install ZeroMQ (http://www.zeromq.org/intro:get-the-software) and "
136 +"PYZMQ (http://zeromq.github.com/pyzmq/)")
137 else:
138 logger.error("No suitable coordination backend found.")
139
140 logger.debug("Parsing URL: " + coordination_url)
141 scheme, username, password, host, port, dbtype = self.__parse_url(coordination_url)
142
143 if port == -1:
144 port = None
145 coordination = bigjob_coordination(server=host, server_port=port, username=username,
146 password=password, dbtype=dbtype, url_prefix=scheme)
147 return coordination
148
149
150
151 - def start_pilot_job(self,
152 lrms_url,
153 bigjob_agent_executable=None,
154 number_nodes=1,
155 queue=None,
156 project=None,
157 working_directory=None,
158 userproxy=None,
159 walltime=None,
160 processes_per_node=1,
161 filetransfers=None):
162 """ Start a batch job (using SAGA Job API) at resource manager. Currently, the following resource manager are supported:
163 fork://localhost/ (Default Job Adaptor
164 gram://qb1.loni.org/jobmanager-pbs (Globus Adaptor)
165 pbspro://localhost (PBS Prop Adaptor)
166
167 """
168
169 if self.job != None:
170 raise BigJobError("One BigJob already active. Please stop BigJob first.")
171 return
172
173
174
175
176 lrms_saga_url = saga.url(lrms_url)
177 self.pilot_url = self.app_url + ":" + lrms_saga_url.host
178 pilot_url_dict[self.pilot_url]=self
179
180 logger.debug("create pilot job entry on backend server: " + self.pilot_url)
181 self.coordination.set_pilot_state(self.pilot_url, str(Unknown), False)
182
183 logger.debug("set pilot state to: " + str(Unknown))
184
185
186 self.number_nodes=int(number_nodes)
187
188
189 jd = saga.job.description()
190
191
192 logger.debug("Adaptor specific modifications: " + str(lrms_saga_url.scheme))
193 if lrms_saga_url.scheme == "condorg":
194 jd.arguments = [ "-a", self.coordination.get_address(), "-b",self.pilot_url]
195 logger.debug("\n\n-a", self.coordination.get_address(),"-b", self.pilot_url)
196 agent_exe = os.path.abspath(os.path.join(os.getcwd(),"..","bootstrap","bigjob-condor-bootstrap.py"))
197 logger.debug(agent_exe)
198 jd.executable = agent_exe
199
200 else:
201 bootstrap_script = self.generate_bootstrap_script(self.coordination.get_address(), self.pilot_url)
202 if lrms_saga_url.scheme == "gram":
203 bootstrap_script = self.escape_rsl(bootstrap_script)
204 elif lrms_saga_url.scheme == "pbspro":
205 bootstrap_script = self.escape_pbs(bootstrap_script)
206 elif lrms_saga_url.scheme == "ssh":
207 bootstrap_script = self.escape_ssh(bootstrap_script)
208
209 elif lrms_saga_url.scheme == "pbs-ssh":
210
211 bootstrap_script = self.escape_ssh(bootstrap_script)
212
213 hrs=walltime/60
214 minu=walltime%60
215 walltimepbs=""+str(hrs)+":"+str(minu)+":00"
216 if number_nodes%processes_per_node == 0:
217 number_nodes = number_nodes/processes_per_node
218 else:
219 number_nodes = ( number_nodes/processes_per_node) + 1
220 pbssshj = pbsssh(bootstrap_script,lrms_saga_url, walltimepbs,number_nodes,processes_per_node,userproxy,working_directory)
221 self.job = pbssshj
222 self.job.run()
223 return
224 elif is_bliss:
225 bootstrap_script = self.escape_bliss(bootstrap_script)
226
227
228 if is_bliss==False:
229 jd.number_of_processes = str(number_nodes)
230 jd.processes_per_host=str(processes_per_node)
231 else:
232 jd.TotalCPUCount=str(int(number_nodes)*int(processes_per_node))
233
234 jd.spmd_variation = "single"
235
236 jd.arguments = ["-c", bootstrap_script]
237 jd.executable = "python"
238 if queue != None:
239 jd.queue = queue
240 if project !=None:
241 jd.job_project = [project]
242 if walltime!=None:
243 jd.wall_time_limit=str(walltime)
244
245
246 if working_directory != None:
247 if not os.path.isdir(working_directory) and lrms_saga_url.scheme=="fork":
248 os.mkdir(working_directory)
249 self.working_directory = working_directory
250 else:
251 self.working_directory = os.path.expanduser("~")
252
253 jd.working_directory = self.working_directory
254
255 logger.debug("Working directory: " + jd.working_directory)
256 jd.output = os.path.join(self.__get_bigjob_working_dir(), "stdout-bigjob_agent.txt")
257 jd.error = os.path.join(self.__get_bigjob_working_dir(),"stderr-bigjob_agent.txt")
258
259
260
261 bigjob_working_directory_url = "ssh://" + lrms_saga_url.host + self.__get_bigjob_working_dir()
262 self.__stage_files(filetransfers, bigjob_working_directory_url)
263
264
265 js = None
266 if userproxy != None and userproxy != '':
267 s = saga.session()
268 os.environ["X509_USER_PROXY"]=userproxy
269 ctx = saga.context("x509")
270 ctx.set_attribute ("UserProxy", userproxy)
271 s.add_context(ctx)
272 logger.debug("use proxy: " + userproxy)
273 js = saga.job.service(s, lrms_saga_url)
274 else:
275 logger.debug("use standard proxy")
276 js = saga.job.service(lrms_saga_url)
277
278 self.job = js.create_job(jd)
279 logger.debug("Submit pilot job to: " + str(lrms_saga_url))
280 self.job.run()
281
282
284 script = textwrap.dedent("""import sys
285 import os
286 import urllib
287 import sys
288 import time
289
290 start_time = time.time()
291
292 home = os.environ["HOME"]
293
294 BIGJOB_AGENT_DIR= os.path.join(home, ".bigjob")
295 if not os.path.exists(BIGJOB_AGENT_DIR): os.mkdir (BIGJOB_AGENT_DIR)
296 BIGJOB_PYTHON_DIR=BIGJOB_AGENT_DIR+"/python/"
297 BOOTSTRAP_URL="https://svn.cct.lsu.edu/repos/saga-projects/applications/bigjob/trunk/generic/bootstrap/bigjob-bootstrap.py"
298 BOOTSTRAP_FILE=BIGJOB_AGENT_DIR+"/bigjob-bootstrap.py"
299
300 try: import saga
301 except: print "SAGA and SAGA Python Bindings not found: BigJob only work w/ non-SAGA backends e.g. Redis, ZMQ.";print "Python version: ", os.system("python -V");print "Python path: " + str(sys.path)
302
303 sys.path.insert(0, os.getcwd() + "/../")
304 sys.path.insert(0, os.getcwd() + "/../../")
305
306 try: import bigjob.bigjob_agent
307 except: print "BigJob not installed. Attempting to install it."; opener = urllib.FancyURLopener({}); opener.retrieve(BOOTSTRAP_URL, BOOTSTRAP_FILE); os.system("python " + BOOTSTRAP_FILE + " " + BIGJOB_PYTHON_DIR); activate_this = BIGJOB_PYTHON_DIR+'bin/activate_this.py'; execfile(activate_this, dict(__file__=activate_this))
308
309 #try to import BJ once again
310 import bigjob.bigjob_agent
311
312 # execute bj agent
313 args = ["bigjob_agent.py", \"%s\", \"%s\"]
314 print "Bootstrap time: " + str(time.time()-start_time)
315 print "Starting BigJob Agents with following args: " + str(args)
316 bigjob_agent = bigjob.bigjob_agent.bigjob_agent(args)
317 """ % (coordination_host, coordination_namespace))
318 return script
319
321 logger.debug("Escape RSL")
322 bootstrap_script = bootstrap_script.replace("\"", "\"\"")
323 return bootstrap_script
324
325
327 logger.debug("Escape PBS")
328 bootstrap_script = "\'" + bootstrap_script+ "\'"
329 return bootstrap_script
330
331
333 logger.debug("Escape SSH")
334 bootstrap_script = bootstrap_script.replace("\"", "\\\"")
335 bootstrap_script = bootstrap_script.replace("\'", "\\\"")
336 bootstrap_script = "\"" + bootstrap_script+ "\""
337 return bootstrap_script
338
340 logger.debug("Escape fork")
341
342 bootstrap_script = bootstrap_script.replace("\'", "\"")
343 bootstrap_script = "\'" + bootstrap_script+ "\'"
344 return bootstrap_script
345
346
348 logger.debug("Stage input files for sub-job")
349 if jd.attribute_exists ("filetransfer"):
350 try:
351 self.__stage_files(jd.filetransfer, self.__get_subjob_working_dir(job_id))
352 except:
353 logger.error("File Stagein failed. Is Paramiko installed?")
354 logger.debug("add subjob to queue of PJ: " + str(self.pilot_url))
355 for i in range(0,3):
356 try:
357 logger.debug("create dictionary for job description. Job-URL: " + job_url)
358
359 job_dict = {}
360
361 job_dict["NumberOfProcesses"] = "1"
362 attributes = jd.list_attributes()
363 logger.debug("SJ Attributes: " + str(attributes))
364 for i in attributes:
365 if jd.attribute_is_vector(i):
366
367 vector_attr = []
368 for j in jd.get_vector_attribute(i):
369 vector_attr.append(j)
370 job_dict[i]=vector_attr
371 else:
372
373 job_dict[i] = jd.get_attribute(i)
374
375 job_dict["state"] = str(Unknown)
376 job_dict["job-id"] = str(job_id)
377
378
379 self.coordination.set_job(job_url, job_dict)
380 self.coordination.queue_job(self.pilot_url, job_url)
381 break
382 except:
383 traceback.print_exc(file=sys.stdout)
384 time.sleep(2)
385
386
388 self.coordination.delete_job(job_url)
389
391 return self.coordination.get_job_state(job_url)
392
394 return self.coordination.get_job(job_url)
395
397 """ duck typing for get_state of saga.job.job
398 state of saga job that is used to spawn the pilot agent
399 """
400 try:
401 return self.job.get_state()
402 except:
403 return None
404
406 """ internal state of BigJob agent """
407 try:
408 return self.coordination.get_pilot_state(self.pilot_url)["state"]
409 except:
410 return None
411
413 jobs = self.coordination.get_jobs_of_pilot(self.pilot_url)
414 number_used_nodes=0
415 for i in jobs:
416 job_detail = self.coordination.get_job(i)
417 if job_detail != None and job_detail.has_key("state") == True\
418 and job_detail["state"]==str(Running):
419 job_np = "1"
420 if (job_detail["NumberOfProcesses"] == True):
421 job_np = job_detail["NumberOfProcesses"]
422 number_used_nodes=number_used_nodes + int(job_np)
423 return (self.number_nodes - number_used_nodes)
424
425
427 """ mark in advert directory of pilot-job as stopped """
428 try:
429 logger.debug("stop pilot job: " + self.pilot_url)
430 self.coordination.set_pilot_state(self.pilot_url, str(Done), True)
431 self.job=None
432 except:
433 pass
434
436 """ duck typing for cancel of saga.cpr.job and saga.job.job """
437 logger.debug("Cancel Pilot Job")
438 try:
439 self.job.cancel()
440 except:
441 pass
442
443 try:
444 self.stop_pilot_job()
445 logger.debug("delete pilot job: " + str(self.pilot_url))
446 if CLEANUP:
447 self.coordination.delete_pilot(self.pilot_url)
448 except:
449 pass
450
451
452
453
454
455
457 try:
458 surl = saga.url(url)
459 host = surl.host
460 port = surl.port
461 username = surl.username
462 password = surl.password
463 query = surl.query
464 scheme = "%s://"%surl.scheme
465 except:
466 """ Fallback URL parser based on Python urlparse library """
467 logger.error("URL %s could not be parsed")
468 traceback.print_exc(file=sys.stderr)
469 result = urlparse.urlparse(url)
470 host = result.hostname
471 port = result.port
472 username = result.username
473 password = result.password
474 if url.find("?")>0:
475 query = url[url.find("?")+1:]
476 else:
477 query = None
478 scheme = "%s://"%result.scheme
479
480 return scheme, username, password, host, port, query
481
482
484 return os.path.join(self.working_directory, self.uuid)
485
486
489
490
492 logger.debug("Stage: %s to %s"%(filetransfers, target_url))
493 self.__create_remote_directory(target_url)
494 if filetransfers==None:
495 return
496 for i in filetransfers:
497 source_file=i
498 if i.find(">")>0:
499 source_file = i[:i.find(">")].strip()
500 target_url_full = os.path.join(target_url, os.path.basename(source_file))
501 logger.debug("Stage: %s to %s"%(source_file, target_url_full))
502 self.__third_party_transfer(source_file, target_url_full)
503
504
506 """
507 Transfers from source URL to machine of PS (target path)
508 """
509 result = urlparse.urlparse(source_url)
510 source_host = result.netloc
511 source_path = result.path
512
513 result = urlparse.urlparse(target_url)
514 target_host = result.netloc
515 target_path = result.path
516
517 python_script= """import sys
518 import os
519 import urllib
520 import sys
521 import time
522 import paramiko
523
524 client = paramiko.SSHClient()
525 client.load_system_host_keys()
526 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
527 client.connect("%s")
528 sftp = client.open_sftp()
529 sftp.put("%s", "%s")
530 """%(target_host, source_path, target_path)
531
532 logging.debug("Execute: \n%s"%python_script)
533 source_client = paramiko.SSHClient()
534 source_client.load_system_host_keys()
535 source_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
536 source_client.connect(source_host)
537 stdin, stdout, stderr = source_client.exec_command("python -c \'%s\'"%python_script)
538 stdin.close()
539 logging.debug("************************************************")
540 logging.debug("Stdout: %s\nStderr:%s", stdout.read(), stderr.read())
541 logging.debug("************************************************")
542
543
545
546
547
548
549
550 scheme = target_url[:target_url.find("://")+3]
551 target_host = target_url[len(scheme):target_url.find("/", len(scheme))]
552 target_path = target_url[len(scheme)+len(target_host):]
553 if target_host == "localhost":
554 os.makedirs(target_path)
555 else:
556 try:
557 client = paramiko.SSHClient()
558 client.load_system_host_keys()
559 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
560 client.connect(target_host)
561 sftp = client.open_sftp()
562 sftp.mkdir(target_path)
563 sftp.close()
564 client.close()
565 except:
566 logger.warn("Error creating directory: " + str(target_path)
567 + " at: " + str(target_host) + " Already exists?" )
568
569
570
572 exc_type, exc_value, exc_traceback = sys.exc_info()
573 print "*** print_exception:"
574 traceback.print_exception(exc_type, exc_value, exc_traceback,
575 limit=2, file=sys.stdout)
576
578 return self.pilot_url
579
582
583
584
585
586 -class subjob(api.base.subjob):
587
588 - def __init__(self, coordination_url=None):
589 """Constructor"""
590 self.coordination_url = coordination_url
591 self.job_url=None
592 self.uuid = "sj-" + str(get_uuid())
593 self.job_url = None
594 self.pilot_url = None
595 self.bj = None
596
598 self.job_url = pilot_url + ":jobs:" + str(self.uuid)
599 return self.job_url
600
601
603 """ submit subjob to referenced bigjob """
604 if self.job_url==None:
605 self.job_url=self.get_job_url(pilot_url)
606
607 if self.pilot_url==None:
608 self.pilot_url = pilot_url
609 self.bj=pilot_url_dict[pilot_url]
610 self.bj.add_subjob(jd, self.job_url, self.uuid)
611
612
614 """ duck typing for saga.job """
615 if self.pilot_url==None:
616 self.pilot_url = pilot_url
617 self.bj=pilot_url_dict[pilot_url]
618 return self.bj.get_subjob_state(self.job_url)
619
620
621 - def cancel(self, pilot_url=None):
622 logger.debug("delete job: " + self.job_url)
623 if self.pilot_url==None:
624 self.pilot_url = pilot_url
625 self.bj=pilot_url_dict[pilot_url]
626 if str(self.bj.get_state())=="Running":
627 self.bj.delete_subjob(self.job_url)
628
629 - def get_exe(self, pilot_url=None):
630 if self.pilot_url==None:
631 self.pilot_url = pilot_url
632 self.bj=pilot_url_dict[pilot_url]
633 sj = self.bj.get_subjob_details(self.job_url)
634 return sj["Executable"]
635
637 if self.pilot_url==None:
638 self.pilot_url = pilot_url
639 self.bj=pilot_url_dict[pilot_url]
640 sj = self.bj.get_subjob_details(self.job_url)
641
642 arguments=""
643 for i in sj["Arguments"]:
644 arguments = arguments + " " + i
645 return arguments
646
649
651 if(self.job_url==None):
652 return "None"
653 else:
654 return self.job_url
655
656
659