1
2
3
4
5
6
7
8
9 print("############################################################")
10 print("Make sure you are running this example on head node")
11 print("############################################################")
12
13
14 try:
15 import sys
16 import getopt
17 import saga
18 import time
19 import pdb
20 import os
21 import traceback
22 import logging
23 import sys
24 except:
25 print("Error: Libraries are missing. Please check SAGA Installation, SAGA Python bindings and other imported packages")
26 print(" Error :Please launch python; import saga; if it works then python bindings & saga installation is successful")
27 sys.exit(0);
28
29
30
31
32
33 sys.path.insert(0, os.getcwd() + "/../")
34 try:
35 from bigjob.bigjob_manager import *
36 from bigjob_dynamic.many_job import *
37 except:
38 print("Error: SAGA BigJob & ManyJob modules are not found")
39
40
41
42
43
44 NUMBER_JOBS=8
45
46
47
48
49 grid_validity=os.system("globus-job-run localhost /bin/date > /dev/null 2>&1")
50 if grid_validity != 0:
51 print(" Grid proxy not initated. Please use grid-proxy-init to initate grid proxy")
52
53
54
55
56
57
59 state = state.lower()
60 if state=="done" or state=="failed" or state=="canceled":
61 return True
62 else:
63 return False
64
65
66
67 if __name__ == "__main__":
68 try:
69
70 print " Number of jobs submitted " + str(NUMBER_JOBS) + " jobs."
71 starttime=time.time()
72
73 resource_list = []
74
75
76
77
78
79
80
81
82
83
84 resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs",
85 "number_of_processes" : "4", "processes_per_node":"4",
86 "allocation" : None, "queue" : "checkpt",
87 "working_directory": (os.getcwd() + "/agent"),
88 "walltime":20 })
89
90 resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs",
91 "number_of_processes" : "4", "processes_per_node":"4",
92 "allocation" : None, "queue" : "checkpt",
93 "working_directory": (os.getcwd() + "/agent"),
94 "walltime":20 })
95
96
97 print "Create manyjob service "
98 mjs = many_job_service(resource_list, COORDINATION_URL)
99 jobs = []
100 job_start_times = {}
101 job_states = {}
102 job_ids={}
103 cwd = os.getcwd()
104
105
106 for i in range(0, NUMBER_JOBS):
107 jd = saga.job.description()
108 jd.executable = "/bin/date"
109 jd.number_of_processes = "1"
110 jd.spmd_variation = "single"
111 jd.arguments = [""]
112 jd.working_directory = os.getcwd()
113 jd.output = "stdout-" + str(i) + ".txt"
114 jd.error = "stderr-" + str(i) + ".txt"
115 subjob = mjs.create_job(jd)
116 subjob.run()
117 print "Submited sub-job " + str(i) + "."
118 jobs.append(subjob)
119 job_start_times[subjob]=time.time()
120 job_states[subjob] = subjob.get_state()
121
122
123 print "************************ All Jobs submitted ************************"
124 while 1:
125 finish_counter=0
126 result_map = {}
127 print("JOB_ID Status Machine Job_Attributes")
128 print("==========================================================")
129 for i in range(0, NUMBER_JOBS):
130 old_state = job_states[jobs[i]]
131 state = jobs[i].get_state()
132 if result_map.has_key(state) == False:
133 result_map[state]=0
134 result_map[state] = result_map[state]+1
135 k = str(jobs[i])
136 if k is not "None":
137 jobid = (k.split(":"))[4]
138 machine = (k.split(":"))[2]
139 exe = jobs[i].get_exe()
140 arguments = jobs[i].get_arguments()
141 strings = str(jobid) + " " + "%7s %10s %10s,%6s" % (str(state),str(machine),str(exe), str(arguments))
142 print(strings)
143
144
145
146
147
148 if has_finished(state)==True:
149 finish_counter = finish_counter + 1
150 job_states[jobs[i]]=state
151
152 print "Current states: " + str(result_map)
153 time.sleep(5)
154 if finish_counter == NUMBER_JOBS:
155 break
156
157 print(" All Jobs completed. Cleaning up the bigjob..." )
158 mjs.cancel()
159 runtime = time.time()-starttime
160 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
161 except:
162 traceback.print_exc(file=sys.stdout)
163 try:
164 k=0
165 mjs.cancel()
166 except:
167 pass
168