| Trees | Indices | Help |
|
|---|
|
|
1 #!/usr/bin/env python
2
3
4 #############################################################
5 # SAGA ManyJob
6 #############################################################
7
8
9 print("############################################################")
10 print("Make sure you are running this example on head node")
11 print("############################################################")
12
13 # import required packages and check for their availability
14 try:
15 import sys
16 import getopt
17 import saga
18 import time
19 import pdb
20 import os
21 import traceback
22 import logging
23 import sys
24 except:
25 print("Error: Libraries are missing. Please check SAGA Installation, SAGA Python bindings and other imported packages")
26 print(" Error :Please launch python; import saga; if it works then python bindings & saga installation is successful")
27 sys.exit(0);
28
29 #############################################################
30 # Set BigJob Path
31 #############################################################
32 # for running BJ from local dir
33 sys.path.insert(0, os.getcwd() + "/../")
34 try:
35 from bigjob.bigjob_manager import *
36 from bigjob_dynamic.many_job import *
37 except:
38 print("Error: SAGA BigJob & ManyJob modules are not found")
39
40 #############################################################
41 # Set number of jobs
42 #############################################################
43
44 NUMBER_JOBS=8
45
46 #############################################################
47 # Grid certificate validation
48 #############################################################
49 grid_validity=os.system("globus-job-run localhost /bin/date > /dev/null 2>&1")
50 if grid_validity != 0:
51 print(" Grid proxy not initated. Please use grid-proxy-init to initate grid proxy")
52
53
54 ############################################################
55 # Functions used to determine state of jobs
56 ############################################################
57
64
65
66
67 if __name__ == "__main__":
68 try:
69
70 print " Number of jobs submitted " + str(NUMBER_JOBS) + " jobs."
71 starttime=time.time()
72
73 resource_list = []
74 #resource_list.append( {"resource_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_nodes" : "64", "allocation" : "<your allocation>", "queue" : "workq", "bigjob_agent": (os.getcwd() + "/bigjob_agent_launcher.sh") , "working_directory": (os.getcwd() + "/agent"), "walltime":10 })
75 #resource_list.append({"resource_url" : "gram://oliver1.loni.org/jobmanager-pbs", "number_nodes" : "1", "processes_per_node":"4", "allocation" : None, "queue" : None, "bigjob_agent": (BIGJOB_HOME + "/bigjob_agent_launcher.sh"), "working_directory": (os.getcwd() + "/agent"), "walltime":30 })
76 #resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs", "number_nodes" : "2", "processes_per_node":"4", "allocation" : None, "queue" : "checkpt", "bigjob_agent": (BIGJOB_HOME + "/bigjob_agent_launcher.sh"), "working_directory": (os.getcwd() + "/agent"), "walltime":4320 })
77
78 # resource_url - Machine on which you want to run the job.
79 # number_of_processes - total number of processes requested on that machine.
80 # processes_per_node
81 # allocation - if None then default allocation is used.
82 # queue - if None then default queue is used.
83
84 resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs",
85 "number_of_processes" : "4", "processes_per_node":"4",
86 "allocation" : None, "queue" : "checkpt",
87 "working_directory": (os.getcwd() + "/agent"),
88 "walltime":20 })
89
90 resource_list.append({"resource_url" : "gram://eric1.loni.org/jobmanager-pbs",
91 "number_of_processes" : "4", "processes_per_node":"4",
92 "allocation" : None, "queue" : "checkpt",
93 "working_directory": (os.getcwd() + "/agent"),
94 "walltime":20 })
95
96
97 print "Create manyjob service "
98 mjs = many_job_service(resource_list, COORDINATION_URL)
99 jobs = []
100 job_start_times = {}
101 job_states = {}
102 job_ids={}
103 cwd = os.getcwd()
104
105
106 for i in range(0, NUMBER_JOBS):
107 jd = saga.job.description()
108 jd.executable = "/bin/date"
109 jd.number_of_processes = "1"
110 jd.spmd_variation = "single"
111 jd.arguments = [""]
112 jd.working_directory = os.getcwd()
113 jd.output = "stdout-" + str(i) + ".txt"
114 jd.error = "stderr-" + str(i) + ".txt"
115 subjob = mjs.create_job(jd)
116 subjob.run()
117 print "Submited sub-job " + str(i) + "."
118 jobs.append(subjob)
119 job_start_times[subjob]=time.time()
120 job_states[subjob] = subjob.get_state()
121
122
123 print "************************ All Jobs submitted ************************"
124 while 1:
125 finish_counter=0
126 result_map = {}
127 print("JOB_ID Status Machine Job_Attributes")
128 print("==========================================================")
129 for i in range(0, NUMBER_JOBS):
130 old_state = job_states[jobs[i]]
131 state = jobs[i].get_state()
132 if result_map.has_key(state) == False:
133 result_map[state]=0
134 result_map[state] = result_map[state]+1
135 k = str(jobs[i])
136 if k is not "None":
137 jobid = (k.split(":"))[4]
138 machine = (k.split(":"))[2]
139 exe = jobs[i].get_exe()
140 arguments = jobs[i].get_arguments()
141 strings = str(jobid) + " " + "%7s %10s %10s,%6s" % (str(state),str(machine),str(exe), str(arguments))
142 print(strings)
143 ##print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
144 #if old_state != state:
145 #print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
146 #if old_state != state and has_finished(state)==True:
147 #print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
148 if has_finished(state)==True:
149 finish_counter = finish_counter + 1
150 job_states[jobs[i]]=state
151
152 print "Current states: " + str(result_map)
153 time.sleep(5)
154 if finish_counter == NUMBER_JOBS:
155 break
156
157 print(" All Jobs completed. Cleaning up the bigjob..." )
158 mjs.cancel()
159 runtime = time.time()-starttime
160 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
161 except:
162 traceback.print_exc(file=sys.stdout)
163 try:
164 k=0
165 mjs.cancel()
166 except:
167 pass
168
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Tue Jan 3 11:44:12 2012 | http://epydoc.sourceforge.net |