1
2
3 """bigjob_agent: bigjob agent that is executed on the resource
4 """
5
6
7 import sys
8 import os
9 import bigjob.state
10 import socket
11 import threading
12 import time
13 import pdb
14 import traceback
15 import ConfigParser
16 import types
17 import logging
18 logging.basicConfig(level=logging.DEBUG)
19
20 try:
21 import saga
22 except:
23 logging.warning("SAGA could not be found. Not all functionalities working")
24
25 sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../ext/threadpool-1.2.7/src/")
26 logging.debug(str(sys.path))
27 from threadpool import *
28 from bigjob import logger
29
30 if sys.version_info < (2, 5):
31 sys.path.append(os.path.dirname( __file__ ) + "/../../ext/uuid-1.30/")
32 sys.stderr.write("Warning: Using unsupported Python version\n")
33 if sys.version_info < (2, 4):
34 sys.path.append(os.path.dirname( __file__ ) + "/../../ext/subprocess-2.6.4/")
35 sys.stderr.write("Warning: Using unsupported Python version\n")
36 if sys.version_info < (2, 3):
37 sys.stderr.write("Warning: Python versions <2.3 not supported\n")
38 sys.exit(-1)
39
40 import subprocess
41
42 """ Config parameters (will move to config file in future) """
43 CONFIG_FILE="bigjob_agent.conf"
44 THREAD_POOL_SIZE=4
45 APPLICATION_NAME="bigjob"
46
48
49 """BigJob Agent:
50 - reads new job information from communication and coordination subsystem (Redis)
51 - starts new jobs
52 - monitors running jobs """
53
54 """Constructor"""
56
57 self.coordination_url = args[1]
58
59 self.jobs = []
60 self.processes = {}
61 self.freenodes = []
62 self.busynodes = []
63 self.restarted = {}
64
65
66
67
68 conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + CONFIG_FILE
69 config = ConfigParser.ConfigParser()
70 logging.debug ("read configfile: " + conf_file)
71 config.read(conf_file)
72 default_dict = config.defaults()
73 self.CPR = default_dict["cpr"]
74 self.SHELL=default_dict["shell"]
75 self.MPIRUN=default_dict["mpirun"]
76 logging.debug("cpr: " + self.CPR + " mpi: " + self.MPIRUN + " shell: " + self.SHELL)
77
78
79 self.init_rms()
80
81 self.failed_polls = 0
82
83
84
85
86 self.base_url = args[2]
87 self.id = self.__get_bj_id(self.base_url)
88 logger.debug("BigJob Agent arguments: " + str(args))
89 logger.debug("Initialize C&C subsystem to pilot-url: " + self.base_url)
90 logger.debug("BigJob ID: %s"%self.id)
91
92
93 self.bj_dir = os.path.join(os.getcwd(), self.id)
94 try:
95 os.makedirs(self.bj_dir)
96 except:
97 logger.debug("Directory already exists.")
98
99 os.chdir(self.bj_dir)
100
101 if(self.coordination_url.startswith("advert://") or self.coordination_url.startswith("sqlasyncadvert://")):
102 try:
103 from coordination.bigjob_coordination_advert import bigjob_coordination
104 logging.debug("Utilizing ADVERT Backend: " + self.coordination_url)
105 except:
106 logger.error("Advert Backend could not be loaded")
107 exc_type, exc_value, exc_traceback = sys.exc_info()
108 traceback.print_exc(file=sys.stderr)
109 traceback.print_tb(exc_traceback, file=sys.stderr)
110 elif (self.coordination_url.startswith("redis://")):
111 try:
112 from coordination.bigjob_coordination_redis import bigjob_coordination
113 logger.debug("Utilizing Redis Backend: " + self.coordination_url + ". Please make sure Redis server is configured in bigjob_coordination_redis.py")
114 except:
115 logger.error("Error loading pyredis.")
116 elif (self.coordination_url.startswith("tcp://")):
117 try:
118 from coordination.bigjob_coordination_zmq import bigjob_coordination
119 logger.debug("Utilizing ZMQ Backend")
120 except:
121 logger.error("ZMQ Backend not found. Please install ZeroMQ (http://www.zeromq.org/intro:get-the-software) and "
122 +"PYZMQ (http://zeromq.github.com/pyzmq/)")
123
124 self.coordination = bigjob_coordination(server_connect_url=self.coordination_url)
125
126
127 self.coordination.set_pilot_state(self.base_url, str(bigjob.state.Running), False)
128
129
130
131
132 self.resource_lock=threading.RLock()
133 self.threadpool = ThreadPool(THREAD_POOL_SIZE)
134
135 self.launcher_thread=threading.Thread(target=self.dequeue_new_jobs)
136 self.launcher_thread.start()
137
138 self.monitoring_thread=threading.Thread(target=self.start_background_thread)
139 self.monitoring_thread.start()
140
141
143 logger.debug("parsing ID out of URL: %s"%url)
144 start = url.index("bj-")
145 end = url.index(":", start)
146 bj_id = url[start:end]
147 return bj_id
148
149
151 if(os.environ.get("PBS_NODEFILE")!=None):
152 return self.init_pbs()
153 elif(os.environ.get("PE_HOSTFILE")!=None):
154 return self.init_sge()
155 else:
156 return self.init_local()
157 return None
158
160 """ initialize free nodes list with dummy (for fork jobs)"""
161 try:
162 num_cpus = self.get_num_cpus()
163 for i in range(0, num_cpus):
164 self.freenodes.append("localhost\n")
165 except IOError:
166 self.freenodes=["localhost\n"]
167
169 """ initialize free nodes list from SGE environment """
170 sge_node_file = os.environ.get("PE_HOSTFILE")
171 if sge_node_file == None:
172 return
173 f = open(sge_node_file)
174 sgenodes = f.readlines()
175 f.close()
176 for i in sgenodes:
177
178 columns = i.split()
179 try:
180 for j in range(0, int(columns[1])):
181 logger.debug("add host: " + columns[0].strip())
182 self.freenodes.append(columns[0]+"\n")
183 except:
184 pass
185 return self.freenodes
186
188 """ initialize free nodes list from PBS environment """
189 pbs_node_file = os.environ.get("PBS_NODEFILE")
190 if pbs_node_file == None:
191 return
192 f = open(pbs_node_file)
193 self.freenodes = f.readlines()
194 f.close()
195
196
197 num_cpus = self.get_num_cpus()
198 node_dict={}
199 for i in set(self.freenodes):
200 node_dict[i] = self.freenodes.count(i)
201 if node_dict[i] < num_cpus:
202 node_dict[i] = num_cpus
203
204 self.freenodes=[]
205 for i in node_dict.keys():
206 logger.debug("host: " + i + " nodes: " + str(node_dict[i]))
207 for j in range(0, node_dict[i]):
208 logger.debug("add host: " + i.strip())
209 self.freenodes.append(i)
210
212 cpuinfo = open("/proc/cpuinfo", "r")
213 cpus = cpuinfo.readlines()
214 cpuinfo.close()
215 num = 0
216 for i in cpus:
217 if i.startswith("processor"):
218 num = num+1
219 return num
220
221
223 """ obtain job attributes from c&c and execute process """
224 state=str(job_dict["state"])
225
226
227
228
229
230
231
232
233
234 if(state==str(bigjob.state.Unknown) or
235 state==str(bigjob.state.New)):
236 try:
237
238 job_id = job_dict["job-id"]
239 logger.debug("Start job id %s specification %s: "%(job_id, str(job_dict)))
240 numberofprocesses = "1"
241 try:
242 if (job_dict.has_key("NumberOfProcesses") == True):
243 numberofprocesses = job_dict["NumberOfProcesses"]
244 except:
245 pass
246
247 spmdvariation="single"
248 try:
249 if (job_dict.has_key("SPMDVariation") == True):
250 spmdvariation = job_dict["SPMDVariation"]
251 except:
252 pass
253
254 arguments = ""
255 if (job_dict.has_key("Arguments") == True):
256 arguments_raw = job_dict['Arguments'];
257 if type(arguments_raw) == types.ListType:
258 arguments_list = arguments_raw
259 else:
260 arguments_list = eval(job_dict["Arguments"])
261 for i in arguments_list:
262 arguments = arguments + " " + i
263
264 environment = os.environ
265 envi = ""
266 if (job_dict.has_key("Environment") == True):
267 env_raw = job_dict['Environment']
268 if type(env_raw) == types.ListType:
269 env_list = env_raw
270 else:
271 env_list = eval(job_dict["Environment"])
272 for i in env_list:
273 envi_1 = "export " + i +"; "
274 envi = envi + envi_1
275
276 executable = job_dict["Executable"]
277
278 workingdirectory = os.path.join(os.getcwd(), job_id)
279 if (job_dict.has_key("WorkingDirectory") == True):
280 workingdirectory = job_dict["WorkingDirectory"]
281 try:
282 os.makedirs(workingdirectory)
283 except:
284 logger.debug("Directory %s already exists."%workingdirectory)
285 logging.debug("Sub-Job: %s, Working_directory: %s"%(job_id, workingdirectory))
286
287
288 output="stdout"
289 if (job_dict.has_key("Output") == True):
290 output = job_dict["Output"]
291 if not os.path.isabs(output):
292 output=os.path.join(workingdirectory, output)
293
294 error=os.path.join(workingdirectory,"stderr")
295 if (job_dict.has_key("Error") == True):
296 error = job_dict["Error"]
297 if not os.path.isabs(error):
298 error=os.path.join(workingdirectory, error)
299
300
301 self.jobs.append(job_url)
302
303
304 output_file = os.path.abspath(output)
305 error_file = os.path.abspath(error)
306 logger.debug("stdout: " + output_file + " stderr: " + error_file
307 + " env: " + str(environment))
308 stdout = open(output_file, "w")
309 stderr = open(error_file, "w")
310 if ( spmdvariation.lower( )!="mpi"):
311 command = envi + executable + " " + arguments
312 else:
313 command = executable + " " + arguments
314
315
316 machinefile = self.allocate_nodes(job_dict)
317 host = "localhost"
318 try:
319 machine_file_handler = open(machinefile, "r")
320 node= machine_file_handler.readlines()
321 machine_file_handler.close()
322 host = node[0].strip()
323 except:
324 pass
325
326
327 if(machinefile==None):
328 logger.debug("Not enough resources to run: " + job_url)
329 self.coordination.queue_job(self.base_url, job_url)
330 return
331
332
333 if (spmdvariation.lower( )=="mpi"):
334 command = "cd " + workingdirectory + "; " + envi + self.MPIRUN + " -np " + numberofprocesses + " -machinefile " + machinefile + " " + command
335
336
337 else:
338 command ="ssh " + host + " \"cd " + workingdirectory + "; " + command +"\""
339 shell = self.SHELL
340 logger.debug("execute: " + command + " in " + workingdirectory + " from: " + str(socket.gethostname()) + " (Shell: " + shell +")")
341
342 p = subprocess.Popen(args=command, executable=shell, stderr=stderr,
343 stdout=stdout, cwd=workingdirectory,
344 env=environment, shell=True)
345 logger.debug("started " + command)
346 self.processes[job_url] = p
347 self.coordination.set_job_state(job_url, str(bigjob.state.Running))
348 except:
349 traceback.print_exc(file=sys.stderr)
350
351
353 """ allocate nodes
354 allocated nodes will be written to machinefile advert-launcher-machines-<jobid>
355 """
356 self.resource_lock.acquire()
357 number_nodes = int(job_dict["NumberOfProcesses"])
358 nodes = []
359 machine_file_name = None
360 if (len(self.freenodes)>=number_nodes):
361 unique_nodes=set(self.freenodes)
362 for i in unique_nodes:
363 number = self.freenodes.count(i)
364 logger.debug("allocate: " + i + " number nodes: " + str(number)
365 + " current busy nodes: " + str(self.busynodes)
366 + " free nodes: " + str(self.freenodes))
367 for j in range(0, number):
368 if(number_nodes > 0):
369 nodes.append(i)
370 self.freenodes.remove(i)
371 self.busynodes.append(i)
372 number_nodes = number_nodes - 1
373 else:
374 break
375
376 machine_file_name = self.get_machine_file_name(job_dict)
377 machine_file = open(machine_file_name, "w")
378
379 machine_file.writelines(nodes)
380 machine_file.close()
381 logger.debug("wrote machinefile: " + machine_file_name + " Nodes: " + str(nodes))
382
383
384
385
386 self.resource_lock.release()
387 return machine_file_name
388
389
390
392 """ Setup charm++ nodefile to use for executing NAMD
393 HACK!! Method violates layering principle
394 File $HOME/machinefile in charm++ nodefileformat is written to first node in list
395 """
396
397
398
399
400
401 nodefile_string=""
402 for i in allocated_nodes:
403 if i.has_key("private_hostname"):
404 nodefile_string=nodefile_string + "host "+ i["private_hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n"
405 else:
406 nodefile_string=nodefile_string + "host "+ i["hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n"
407
408
409 jd = saga.job.description()
410 jd.executable = "echo"
411 jd.number_of_processes = "1"
412 jd.spmd_variation = "single"
413
414 jd.arguments = ["\""+nodefile_string+"\"", ">", "machinefile"]
415 jd.output = "stdout.txt"
416 jd.error = "stderr.txt"
417
418 job_service_url = saga.url("ssh://root@"+allocated_nodes[0]["hostname"])
419 job_service = saga.job.service(self.session, job_service_url)
420 job = job_service.create_job(jd)
421 job.run()
422 job.wait()
423
425 fh = open(filename, "r")
426 lines = fh.readlines()
427 fh.close
428 logger.debug("Machinefile: " + filename + " Hosts: " + str(lines))
429
431 job_dict = self.coordination.get_job(job_url)
432 self.resource_lock.acquire()
433 number_nodes = int(job_dict["NumberOfProcesses"])
434 machine_file_name = self.get_machine_file_name(job_dict)
435 logger.debug("Machine file: " + machine_file_name)
436 allocated_nodes = ["localhost\n"]
437 try:
438 machine_file = open(machine_file_name, "r")
439 allocated_nodes = machine_file.readlines()
440 machine_file.close()
441 except:
442 traceback.print_exc(file=sys.stderr)
443
444 logger.debug("Free nodes: " + str(allocated_nodes))
445
446 for i in allocated_nodes:
447 logger.debug("free node: " + str(i) + " current busy nodes: " + str(self.busynodes)
448 + " free nodes: " + str(self.freenodes))
449 self.busynodes.remove(i)
450 self.freenodes.append(i)
451 logger.debug("Delete " + machine_file_name)
452 if os.path.exists(machine_file_name):
453 os.remove(machine_file_name)
454 self.resource_lock.release()
455
456
458 """create machinefile based on jobid"""
459 job_id = job_dict["job-id"]
460 homedir = os.path.expanduser('~')
461 return homedir + "/advert-launcher-machines-"+ job_id
462
464 """Subscribe to new jobs from Redis. """
465 job_counter = 0
466 while self.is_stopped(self.base_url)==False:
467 if len(self.freenodes)==0:
468 time.sleep(3)
469 continue
470 logger.debug("Dequeue sub-job from: " + self.base_url)
471 job_url=self.coordination.dequeue_job(self.base_url)
472 logger.debug("Dequed:%s"%str(job_url))
473 if job_url==None:
474 time.sleep(3)
475 continue
476 if job_url=="STOP":
477 break
478
479 job_counter = job_counter + 1
480 if (job_counter % (THREAD_POOL_SIZE))==0:
481 self.threadpool.wait()
482
483 request = WorkRequest(self.start_new_job_in_thread, [job_url])
484 self.threadpool.putRequest(request)
485
486
487 self.threadpool.wait()
488 logger.debug("Terminating Agent - Dequeue Sub-Jobs Thread")
489
490
491
492
493
494
495
496
497
498
500 """evaluates job dir, sanity checks, executes job """
501
502 if job_url != None:
503 failed = False;
504 try:
505 logger.debug("Get job description")
506 job_dict = self.coordination.get_job(job_url)
507 except:
508 logger.error("Failed to get job description")
509 failed=True
510
511 if job_dict==None or failed==True:
512 self.coordination.queue_job(self.pilot_url, job_url)
513
514 logger.debug("start job: " + job_url + " data: " + str(job_dict))
515 if(job_dict["state"]==str(bigjob.state.Unknown)):
516 job_dict["state"]=str(bigjob.state.New)
517 self.coordination.set_job_state(job_url, str(bigjob.state.New))
518 self.execute_job(job_url, job_dict)
519
520
522 """Monitor running processes. """
523
524 logger.debug("Monitor jobs - # current jobs: %d"%len(self.jobs))
525 for i in self.jobs:
526 if self.processes.has_key(i):
527 p = self.processes[i]
528 p_state = p.poll()
529 logger.debug(self.print_job(i) + " state: " + str(p_state) + " return code: " + str(p.returncode))
530 if (p_state != None and (p_state==0 or p_state==255)):
531 logger.debug("Job successful: " + self.print_job(i))
532 self.coordination.set_job_state(i, str(bigjob.state.Done))
533
534 self.free_nodes(i)
535 del self.processes[i]
536 elif p_state!=0 and p_state!=255 and p_state != None:
537 logger.debug(self.print_job(i) + " failed. ")
538
539
540
541
542
543
544
545 logger.debug("Job failed " + self.print_job(i))
546 self.coordination.set_job_state(i, str(bigjob.state.Failed))
547 self.free_nodes(i)
548 del self.processes[i]
549
551 job_dict = self.coordination.get_job(job_url)
552 return ("Job: " + job_url + " Excutable: " + job_dict["Executable"])
553
554
556 self.stop=False
557 logger.debug("##################################### New POLL/MONITOR cycle ##################################")
558 logger.debug("Free nodes: " + str(len(self.freenodes)) + " Busy Nodes: " + str(len(self.busynodes)))
559 while True and self.stop==False:
560 if self.is_stopped(self.base_url)==True:
561 logger.debug("Pilot job entry deleted - terminate agent")
562 break
563 else:
564 logger.debug("Pilot job entry: " + str(self.base_url) + " exists. Pilot job not in state stopped.")
565 try:
566
567 self.monitor_jobs()
568 time.sleep(5)
569 self.failed_polls=0
570 except:
571 traceback.print_exc(file=sys.stdout)
572 self.failed_polls=self.failed_polls+1
573 if self.failed_polls>3:
574 break
575 logger.debug("Terminating Agent - Background Thread")
576
577
579 state = None
580 try:
581 state = self.coordination.get_pilot_state(base_url)
582 except:
583 pass
584 logger.debug("Pilot State: " + str(state))
585 if state==None or state.has_key("stopped")==False or state["stopped"]==True:
586 return True
587 else:
588 return False
589
590
592 self.stop=True
593
594
595
596
597
598 if __name__ == "__main__" :
599 args = sys.argv
600 num_args = len(args)
601 if (num_args!=3):
602 print "Usage: \n " + args[0] + " <coordination host url> <coordination namespace url>"
603 sys.exit(1)
604
605 bigjob_agent = bigjob_agent(args)
606