1
2 """bigjob_agent: bigjob agent adapted for Condor resources
3 """
4 import sys
5 import os
6 import bigjob.state
7 import socket
8 import threading
9 import time
10 import pdb
11 import traceback
12 import ConfigParser
13 import types
14 import logging
15 logging.basicConfig(level=logging.DEBUG)
16
17 try:
18 import saga
19 except:
20 logging.warning("SAGA could not be found. Not all functionalities working")
21
22 sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../ext/threadpool-1.2.7/src/")
23 logging.debug(str(sys.path))
24 from threadpool import *
25
26 if sys.version_info < (2, 5):
27 sys.path.append(os.path.dirname( __file__ ) + "/../../ext/uuid-1.30/")
28 sys.stderr.write("Warning: Using unsupported Python version\n")
29 if sys.version_info < (2, 4):
30 sys.path.append(os.path.dirname( __file__ ) + "/../../ext/subprocess-2.6.4/")
31 sys.stderr.write("Warning: Using unsupported Python version\n")
32 if sys.version_info < (2, 3):
33 sys.stderr.write("Warning: Python versions <2.3 not supported\n")
34 sys.exit(-1)
35
36 import subprocess
37
38 """ Config parameters (will move to config file in future) """
39 CONFIG_FILE="bigjob_agent.conf"
40 THREAD_POOL_SIZE=4
41 APPLICATION_NAME="bigjob"
42
44
45 """BigJob Agent:
46 - reads new job information from communication and coordination subsystem (Redis)
47 - starts new jobs
48 - monitors running jobs """
49
50 """Constructor"""
52
53 self.coordination_url = args[1]
54
55 self.jobs = []
56 self.processes = {}
57 self.freenodes = []
58 self.busynodes = []
59 self.restarted = {}
60
61
62
63
64 conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + CONFIG_FILE
65 config = ConfigParser.ConfigParser()
66 logging.debug ("read configfile: " + conf_file)
67 config.read(conf_file)
68 default_dict = config.defaults()
69 self.CPR = default_dict["cpr"]
70 self.SHELL=default_dict["shell"]
71 self.MPIRUN=default_dict["mpirun"]
72 logging.debug("cpr: " + self.CPR + " mpi: " + self.MPIRUN + " shell: " + self.SHELL)
73
74
75 self.init_rms()
76
77 self.failed_polls = 0
78
79
80
81
82 self.base_url = args[2]
83 logging.debug("BigJob Agent arguments: " + str(args))
84 logging.debug("Initialize C&C subsystem to pilot-url: " + self.base_url)
85
86
87 if(self.coordination_url.startswith("advert://")):
88 try:
89 from coordination.bigjob_coordination_advert import bigjob_coordination
90 logging.debug("Utilizing ADVERT Backend: " + self.coordination_url)
91 except:
92 logging.error("Advert Backend could not be loaded")
93 elif (self.coordination_url.startswith("redis://")):
94 try:
95 from coordination.bigjob_coordination_redis import bigjob_coordination
96 logging.debug("Utilizing Redis Backend: " + self.coordination_url + ". Please make sure Redis server is configured in bigjob_coordination_redis.py")
97 except:
98 logging.error("Error loading pyredis.")
99 elif (self.coordination_url.startswith("tcp://")):
100 try:
101 from coordination.bigjob_coordination_zmq import bigjob_coordination
102 logging.debug("Utilizing ZMQ Backend")
103 except:
104 logging.error("ZMQ Backend not found. Please install ZeroMQ (http://www.zeromq.org/intro:get-the-software) and "
105 +"PYZMQ (http://zeromq.github.com/pyzmq/)")
106
107 self.coordination = bigjob_coordination(server_connect_url=self.coordination_url)
108
109
110 self.coordination.set_pilot_state(self.base_url, str(bigjob.state.Running), False)
111
112
113
114
115 self.resource_lock=threading.RLock()
116 self.threadpool = ThreadPool(THREAD_POOL_SIZE)
117
118 self.launcher_thread=threading.Thread(target=self.dequeue_new_jobs)
119 self.launcher_thread.start()
120
121 self.monitoring_thread=threading.Thread(target=self.start_background_thread)
122 self.monitoring_thread.start()
123
124
125
127 if(os.environ.get("PBS_NODEFILE")!=None):
128 return self.init_pbs()
129 elif(os.environ.get("PE_HOSTFILE")!=None):
130 return self.init_sge()
131 else:
132 return self.init_local()
133 return None
134
136 """ initialize free nodes list with dummy (for fork jobs)"""
137 try:
138 num_cpus = self.get_num_cpus()
139 for i in range(0, num_cpus):
140 self.freenodes.append("localhost\n")
141 except IOError:
142 self.freenodes=["localhost\n"]
143
145 """ initialize free nodes list from SGE environment """
146 sge_node_file = os.environ.get("PE_HOSTFILE")
147 if sge_node_file == None:
148 return
149 f = open(sge_node_file)
150 sgenodes = f.readlines()
151 f.close()
152 for i in sgenodes:
153
154 columns = i.split()
155 try:
156 for j in range(0, int(columns[1])):
157 logging.debug("add host: " + columns[0].strip())
158 self.freenodes.append(columns[0]+"\n")
159 except:
160 pass
161 return self.freenodes
162
164 """ initialize free nodes list from PBS environment """
165 pbs_node_file = os.environ.get("PBS_NODEFILE")
166 if pbs_node_file == None:
167 return
168 f = open(pbs_node_file)
169 self.freenodes = f.readlines()
170 f.close()
171
172
173 num_cpus = self.get_num_cpus()
174 node_dict={}
175 for i in set(self.freenodes):
176 node_dict[i] = self.freenodes.count(i)
177 if node_dict[i] < num_cpus:
178 node_dict[i] = num_cpus
179
180 self.freenodes=[]
181 for i in node_dict.keys():
182 logging.debug("host: " + i + " nodes: " + str(node_dict[i]))
183 for j in range(0, node_dict[i]):
184 logging.debug("add host: " + i.strip())
185 self.freenodes.append(i)
186
188 cpuinfo = open("/proc/cpuinfo", "r")
189 cpus = cpuinfo.readlines()
190 cpuinfo.close()
191 num = 0
192 for i in cpus:
193 if i.startswith("processor"):
194 num = num+1
195 return num
196
197
199 """ obtain job attributes from c&c and execute process """
200 state=str(job_dict["state"])
201
202
203
204
205
206
207
208
209
210 if(state==str(bigjob.state.Unknown) or
211 state==str(bigjob.state.New)):
212 try:
213
214 logging.debug("Start job: " + str(job_dict))
215 numberofprocesses = "1"
216 if (job_dict.has_key("NumberOfProcesses") == True):
217 numberofprocesses = job_dict["NumberOfProcesses"]
218
219 spmdvariation="single"
220 if (job_dict.has_key("SPMDVariation") == True):
221 spmdvariation = job_dict["SPMDVariation"]
222
223 arguments = ""
224 if (job_dict.has_key("Arguments") == True):
225 arguments_raw = job_dict['Arguments'];
226 if type(arguments_raw) == types.ListType:
227 arguments_list = arguments_raw
228 else:
229 arguments_list = eval(job_dict["Arguments"])
230 for i in arguments_list:
231 arguments = arguments + " " + i
232
233 workingdirectory = os.getcwd()
234 if (job_dict.has_key("WorkingDirectory") == True):
235 workingdirectory = job_dict["WorkingDirectory"]
236
237 environment = os.environ
238 if (job_dict.has_key("Environment") == True):
239 for i in job_dict["Environment"]:
240 env = i.split("=")
241 environment[env[0]]=env[1] + ":" + environment[env[0]]
242
243 environment["PATH"]= workingdirectory + ":"+environment["PATH"]
244 print "environment[PATH]", environment["PATH"]
245 executable = job_dict["Executable"]
246
247
248 output="stdout"
249 if (job_dict.has_key("Output") == True):
250 output = job_dict["Output"]
251
252 error="stderr"
253 if (job_dict.has_key("Error") == True):
254 error = job_dict["Error"]
255
256
257 self.jobs.append(job_url)
258
259
260 output_file = os.path.join(workingdirectory, output)
261 error_file = os.path.join(workingdirectory, error)
262 logging.debug("stdout: " + output_file + " stderr: " + error_file + " env: " + str(environment))
263 stdout = open(output_file, "w")
264 stderr = open(error_file, "w")
265
266
267 command = executable + " " + arguments
268
269
270 machinefile = self.allocate_nodes(job_dict)
271 host = "localhost"
272 try:
273 machine_file_handler = open(machinefile, "r")
274 node= machine_file_handler.readlines()
275 machine_file_handler.close()
276 host = node[0].strip()
277 except:
278 pass
279
280
281 if(machinefile==None):
282 logging.debug("Not enough resources to run: " + job_url)
283 self.coordination.queue_job(self.base_url, job_url)
284 return
285
286
287 if (spmdvariation.lower( )=="mpi"):
288 command = "cd " + workingdirectory + "; " + self.MPIRUN + " -np " + numberofprocesses + " -machinefile " + machinefile + " " + command
289
290
291 else:
292 command ="chmod +x " + executable +";export PATH=$PATH:" + workingdirectory + ";" +command
293 shell = self.SHELL
294 logging.debug("execute: " + command + " in " + workingdirectory + " from: " + str(socket.gethostname()) + " (Shell: " + shell +")")
295
296 p = subprocess.Popen(args=command, executable=shell, stderr=stderr,
297 stdout=stdout, cwd=workingdirectory,
298 env=environment, shell=True)
299 logging.debug("started " + command)
300
301
302 dirlist = os.listdir(workingdirectory)
303 print dirlist
304 os.system("ls;pwd")
305
306 self.processes[job_url] = p
307 self.coordination.set_job_state(job_url, str(bigjob.state.Running))
308 except:
309 traceback.print_exc(file=sys.stderr)
310
311
313 """ allocate nodes
314 allocated nodes will be written to machinefile advert-launcher-machines-<jobid>
315 """
316 self.resource_lock.acquire()
317 number_nodes = int(job_dict["NumberOfProcesses"])
318 nodes = []
319 machine_file_name = None
320 if (len(self.freenodes)>=number_nodes):
321 unique_nodes=set(self.freenodes)
322 for i in unique_nodes:
323 number = self.freenodes.count(i)
324 logging.debug("allocate: " + i + " number nodes: " + str(number)
325 + " current busy nodes: " + str(self.busynodes)
326 + " free nodes: " + str(self.freenodes))
327 for j in range(0, number):
328 if(number_nodes > 0):
329 nodes.append(i)
330 self.freenodes.remove(i)
331 self.busynodes.append(i)
332 number_nodes = number_nodes - 1
333 else:
334 break
335
336 machine_file_name = self.get_machine_file_name(job_dict)
337 machine_file = open(machine_file_name, "w")
338
339 machine_file.writelines(nodes)
340 machine_file.close()
341 logging.debug("wrote machinefile: " + machine_file_name + " Nodes: " + str(nodes))
342
343
344
345
346 self.resource_lock.release()
347 return machine_file_name
348
349
350
352 """ Setup charm++ nodefile to use for executing NAMD
353 HACK!! Method violates layering principle
354 File $HOME/machinefile in charm++ nodefileformat is written to first node in list
355 """
356
357
358
359
360
361 nodefile_string=""
362 for i in allocated_nodes:
363 if i.has_key("private_hostname"):
364 nodefile_string=nodefile_string + "host "+ i["private_hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n"
365 else:
366 nodefile_string=nodefile_string + "host "+ i["hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n"
367
368
369 jd = saga.job.description()
370 jd.executable = "echo"
371 jd.number_of_processes = "1"
372 jd.spmd_variation = "single"
373
374 jd.arguments = ["\""+nodefile_string+"\"", ">", "machinefile"]
375 jd.output = "stdout.txt"
376 jd.error = "stderr.txt"
377
378 job_service_url = saga.url("ssh://root@"+allocated_nodes[0]["hostname"])
379 job_service = saga.job.service(self.session, job_service_url)
380 job = job_service.create_job(jd)
381 job.run()
382 job.wait()
383
385 fh = open(filename, "r")
386 lines = fh.readlines()
387 fh.close
388 logging.debug("Machinefile: " + filename + " Hosts: " + str(lines))
389
391 job_dict = self.coordination.get_job(job_url)
392 self.resource_lock.acquire()
393 number_nodes = int(job_dict["NumberOfProcesses"])
394 machine_file_name = self.get_machine_file_name(job_dict)
395 logging.debug("Machine file: " + machine_file_name)
396 allocated_nodes = ["localhost\n"]
397 try:
398 machine_file = open(machine_file_name, "r")
399 allocated_nodes = machine_file.readlines()
400 machine_file.close()
401 except:
402 traceback.print_exc(file=sys.stderr)
403
404 logging.debug("Free nodes: " + str(allocated_nodes))
405
406 for i in allocated_nodes:
407 logging.debug("free node: " + str(i) + " current busy nodes: " + str(self.busynodes)
408 + " free nodes: " + str(self.freenodes))
409 self.busynodes.remove(i)
410 self.freenodes.append(i)
411 logging.debug("Delete " + machine_file_name)
412 if os.path.exists(machine_file_name):
413 os.remove(machine_file_name)
414 self.resource_lock.release()
415
416
418 """create machinefile based on jobid"""
419 job_id = job_dict["job-id"]
420 homedir = os.path.expanduser('~')
421 return homedir + "/advert-launcher-machines-"+ job_id
422
424 """Subscribe to new jobs from Redis. """
425 job_counter = 0
426 while self.is_stopped(self.base_url)==False:
427 if len(self.freenodes)==0:
428 time.sleep(3)
429 continue
430 logging.debug("Dequeue sub-job from: " + self.base_url)
431 job_url=self.coordination.dequeue_job(self.base_url)
432 if job_url==None:
433 time.sleep(3)
434 continue
435 if job_url=="STOP":
436 break
437
438 job_counter = job_counter + 1
439 if (job_counter % (THREAD_POOL_SIZE))==0:
440 self.threadpool.wait()
441
442 request = WorkRequest(self.start_new_job_in_thread, [job_url])
443 self.threadpool.putRequest(request)
444
445
446 self.threadpool.wait()
447 logging.debug("Terminating Agent - Dequeue Sub-Jobs Thread")
448
449
450
451
452
453
454
455
456
457
459 """evaluates job dir, sanity checks, executes job """
460
461 if job_url != None:
462 failed = False;
463 try:
464 job_dict = self.coordination.get_job(job_url)
465 except:
466 failed=True
467
468 if job_dict==None or failed==True:
469 self.coordination.queue_job(self.pilot_url, job_url)
470
471 logging.debug("start job: " + job_url + " data: " + str(job_dict))
472 if(job_dict["state"]==str(bigjob.state.Unknown)):
473 job_dict["state"]=str(bigjob.state.New)
474 self.coordination.set_job_state(job_url, str(bigjob.state.New))
475 self.execute_job(job_url, job_dict)
476
477
479 """Monitor running processes. """
480
481 logging.debug("Monitor jobs - # current jobs: %d"%len(self.jobs))
482 for i in self.jobs:
483 if self.processes.has_key(i):
484 p = self.processes[i]
485 p_state = p.poll()
486 logging.debug(self.print_job(i) + " state: " + str(p_state) + " return code: " + str(p.returncode))
487 if (p_state != None and (p_state==0 or p_state==255)):
488 logging.debug("Job successful: " + self.print_job(i))
489 self.coordination.set_job_state(i, str(bigjob.state.Done))
490
491 self.free_nodes(i)
492 del self.processes[i]
493 elif p_state!=0 and p_state!=255 and p_state != None:
494 logging.debug(self.print_job(i) + " failed. ")
495
496
497
498
499
500
501
502 logging.debug("Job failed " + self.print_job(i))
503 self.coordination.set_job_state(i, str(bigjob.state.Failed))
504 self.free_nodes(i)
505 del self.processes[i]
506
508 job_dict = self.coordination.get_job(job_url)
509 return ("Job: " + job_url
510 + " Excutable: " + job_dict["Executable"])
511
512
514 self.stop=False
515 logging.debug("##################################### New POLL/MONITOR cycle ##################################")
516 logging.debug("Free nodes: " + str(len(self.freenodes)) + " Busy Nodes: " + str(len(self.busynodes)))
517 while True and self.stop==False:
518 if self.is_stopped(self.base_url)==True:
519 logging.debug("Pilot job entry deleted - terminate agent")
520 break
521 else:
522 logging.debug("Pilot job entry: " + str(self.base_url) + " exists. Pilot job not in state stopped.")
523 try:
524
525 self.monitor_jobs()
526 time.sleep(5)
527 self.failed_polls=0
528 except:
529 traceback.print_exc(file=sys.stdout)
530 self.failed_polls=self.failed_polls+1
531 if self.failed_polls>3:
532 break
533 logging.debug("Terminating Agent - Background Thread")
534
535
537 state = None
538 try:
539 state = self.coordination.get_pilot_state(base_url)
540 except:
541 pass
542 logging.debug("Pilot State: " + str(state))
543 if state==None or state.has_key("stopped")==False or state["stopped"]==True:
544 return True
545 else:
546 return False
547
548
550 self.stop=True
551
552
553
554
555
556 if __name__ == "__main__" :
557 args = sys.argv
558 num_args = len(args)
559 if (num_args!=3):
560 print "Usage: \n " + args[0] + " <coordination host url> <coordination namespace url>"
561 sys.exit(1)
562
563 bigjob_agent = bigjob_agent(args)
564