1
2
3 """
4 Dynamic BigJob (ManyJob) Example
5
6 This Module is used to launch a set of bigjobs.
7
8 DON'T EDIT THIS FILE (UNLESS THERE IS A BUG)
9
10 THIS FILE SHOULD NOT BE COMMITTED TO SVN WITH USE-SPECIFIC PATHS!
11
12 """
13
14 import sys
15 import getopt
16 import time
17 import pdb
18 import os
19 import traceback
20 import logging
21
22
23
24
25 import sys
26
27 sys.path.insert(0, os.getcwd() + "/../")
28
29 from bigjob import bigjob, subjob, description
30 from bigjob_dynamic.many_job import *
31
32
33 """ This variable defines the coordination system that is used by BigJob
34 e.g.
35 advert://localhost (SAGA/Advert SQLITE)
36 advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL)
37 redis://localhost:6379 (Redis at localhost)
38 tcp://localhost (ZMQ)
39 """
40 COORDINATION_URL = "advert://localhost"
41
42
43 NUMBER_JOBS=8
44
46 state = state.lower()
47 if state=="done" or state=="failed" or state=="canceled":
48 return True
49 else:
50 return False
51
53 try:
54 print "ManyJob load test with " + str(NUMBER_JOBS) + " jobs."
55 starttime=time.time()
56
57 """ submit via mj abstraction
58
59 resource_list.append( {"resource_url" : "gram://eric1.loni.org/jobmanager-pbs", "processes_per_node":"4",
60 "number_of_processes" : "4", "allocation" : None, "queue" : "workq",
61 "working_directory": (os.getcwd() + "/agent"), "walltime":10 })
62
63 """
64 resource_list = []
65 resource_dictionary = {"resource_url" : "fork://localhost/", "number_of_processes" : "32",
66 "processes_per_node":"1", "allocation" : None, "queue" : None,
67 "working_directory": (os.getcwd() + "/agent"), "walltime":3600 }
68 resource_list.append(resource_dictionary)
69
70
71
72 add_additional_resources=True
73 remove_additional_resources=False
74
75
76 print "Create Dynamic BigJob Service "
77 mjs = many_job_service(resource_list, COORDINATION_URL)
78
79 jobs = []
80 job_start_times = {}
81 job_states = {}
82 cwd = os.getcwd()
83 for i in range(0, NUMBER_JOBS):
84
85 jd = description()
86 jd.executable = "/bin/date"
87 jd.number_of_processes = "1"
88 jd.spmd_variation = "single"
89 jd.arguments = [""]
90 jd.working_directory = os.getcwd();
91 jd.output = "stdout-" + str(i) + ".txt"
92 jd.error = "stderr-" + str(i) + ".txt"
93 subjob = mjs.create_job(jd)
94 subjob.run()
95 print "Submited sub-job " + "%d"%i + "."
96 jobs.append(subjob)
97 job_start_times[subjob]=time.time()
98 job_states[subjob] = subjob.get_state()
99 print "************************ All Jobs submitted ************************"
100 while 1:
101 finish_counter=0
102 result_map = {}
103 for i in range(0, NUMBER_JOBS):
104 old_state = job_states[jobs[i]]
105 state = jobs[i].get_state()
106 if result_map.has_key(state) == False:
107 result_map[state]=0
108 result_map[state] = result_map[state]+1
109
110 if old_state != state:
111 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
112 if old_state != state and has_finished(state)==True:
113 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
114 if has_finished(state)==True:
115 finish_counter = finish_counter + 1
116 job_states[jobs[i]]=state
117
118
119
120 if time.time()-starttime > 10 and add_additional_resources==True:
121 print "***add additional resources***"
122 mjs.add_resource(resource_dictionary)
123 add_additional_resources=False
124
125
126 if (time.time()-starttime > 15 and remove_additional_resources==True):
127 bj_list = mjs.get_resources()
128 if len(bj_list)>0:
129 print "***remove resources: " + str(bj_list[0])
130 mjs.remove_resource(bj_list[0])
131 remove_additional_resources=False
132
133 print "Current states: " + str(result_map)
134 time.sleep(5)
135 if finish_counter == NUMBER_JOBS:
136 break
137
138 mjs.cancel()
139 runtime = time.time()-starttime
140 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
141 except:
142 traceback.print_exc(file=sys.stdout)
143 try:
144 mjs.cancel()
145 except:
146 pass
147
148 """ Test Job Submission via ManyJob abstraction """
149 if __name__ == "__main__":
150 main()
151