1 import saga
2 import os
3 import time
4 import sys
5 import pdb
6 import datetime
7
8 from bigjob.bigjob_manager import bigjob, subjob
9
10
11
12
13
14
15
16 COORDINATION_URL = "tcp://localhost"
17
18
19 NUMBER_JOBS=[32]
20 NUMBER_NODES=[2]
21
22 NUMBER_CORES_PER_NODE=1
23 RESULT_DIR="results"
24 RESULT_FILE_PREFIX="results/results-"
25 LRMS_URL="fork://localhost"
26
27
29 state = state.lower()
30 if state=="done" or state=="failed" or state=="canceled":
31 return True
32 else:
33 return False
34
35
36 -def load_test(coordination_url, number_jobs, number_nodes, number_cores_per_node):
37
38 print "\n**************************************************************************************************************************************************\n"
39 print ("Start test scenario - #nodes:%d, #cores/node:%d, #jobs: %d, coordination-url:%s, lrms-url:%s"%
40 (number_nodes, number_cores_per_node, number_jobs, coordination_url, LRMS_URL))
41 print "\n**************************************************************************************************************************************************\n"
42
43 starttime=time.time()
44
45
46
47 lrms_url = LRMS_URL
48 workingdirectory=os.getcwd() +"/agent"
49
50
51 print "Start Pilot Job/BigJob at: " + lrms_url
52 bj = bigjob(coordination_url)
53 bj.start_pilot_job(lrms_url=lrms_url,
54 number_nodes=number_nodes,
55 processes_per_node=number_cores_per_node,
56 working_directory=workingdirectory
57 )
58
59 queueing_time = None
60 subjob_submission_time = None
61 pilot_state = str(bj.get_state_detail())
62 if pilot_state=="Running" and queueing_time==None:
63 queueing_time=time.time()-starttime
64 print "*** Pilot State: " + pilot_state + " queue time: " + str(queueing_time)
65 print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + pilot_state
66
67
68
69 jobs = []
70 job_start_times = {}
71 job_states = {}
72 for i in range(0, number_jobs):
73 jd = saga.job.description()
74 jd.executable = "/bin/date"
75 jd.number_of_processes = "1"
76 jd.spmd_variation = "single"
77 jd.arguments = [""]
78 jd.working_directory = os.getcwd()
79 jd.output = "sj-stdout-"+str(i)+".txt"
80 jd.error = "sj-stderr-"+str(i)+".txt"
81
82 sj = subjob()
83 sj.submit_job(bj.pilot_url, jd)
84 jobs.append(sj)
85 job_start_times[sj]=time.time()
86 job_states[sj] = sj.get_state()
87
88 if pilot_state != "Running":
89 pilot_state = str(bj.get_state_detail())
90 if pilot_state=="Running" and queueing_time==None:
91 queueing_time=time.time()-starttime
92 print "*** Pilot State: " + pilot_state + " queue time: " + str(queueing_time)
93
94 subjob_submission_time = time.time()-starttime
95
96 while 1:
97 pilot_state = str(bj.get_state())
98 if pilot_state=="Running" and queueing_time==None:
99 queueing_time=time.time()-starttime
100 print "*** Pilot State: " + pilot_state + " queue time: " + str(queueing_time)
101 finish_counter=0
102 result_map = {}
103 for i in range(0, number_jobs):
104 old_state = job_states[jobs[i]]
105 state = jobs[i].get_state()
106 if result_map.has_key(state)==False:
107 result_map[state]=1
108 else:
109 result_map[state] = result_map[state]+1
110
111 if old_state != state:
112 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
113 if old_state != state and has_finished(state)==True:
114 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
115 if has_finished(state)==True:
116 finish_counter = finish_counter + 1
117 job_states[jobs[i]]=state
118
119 print "Pilot State: %s; %d/%d jobs finished"%(pilot_state,finish_counter,number_jobs)
120 if finish_counter >= number_jobs-1 or pilot_state == "Failed":
121 break
122 time.sleep(2)
123
124 runtime = time.time()-starttime
125
126
127
128
129 result = ("%d,%d,%d,%s,%s,%s,%s,%s"%
130 (number_nodes, number_cores_per_node, number_jobs, str(runtime), str(queueing_time),coordination_url, LRMS_URL,str(subjob_submission_time)))
131
132 result_tab = ("%d\t%d\t%d\t%s\t%s\t%s\t%s"%
133 (number_nodes, number_cores_per_node, number_jobs, str(runtime), str(queueing_time), coordination_url, LRMS_URL))
134 print ("#Nodes\t#cores/node\t#jobs\tRuntime\tQueuing Time\tCoordination URL\tLRMS URL")
135 print result_tab
136
137 bj.cancel()
138
139 os.system("qstat -u `whoami` | grep -o ^[0-9]* |xargs qdel")
140 return result
141
142
143
144 """ Test Job Submission via BigJob """
145 if __name__ == "__main__":
146 try:
147 os.mkdir(RESULT_DIR)
148 except:
149 pass
150 d =datetime.datetime.now()
151 result_filename = RESULT_FILE_PREFIX + d.strftime("%Y%m%d-%H%M%S") + ".csv"
152 f = open(result_filename, "w")
153 f.write("#Nodes,#cores/node,#jobs,Runtime,Queuing Time,Coordination URL,LRMS URL,SubJob Submission Time\n")
154 for i in range(0, len(NUMBER_JOBS)):
155 result = load_test(COORDINATION_URL, NUMBER_JOBS[i], NUMBER_NODES[i], NUMBER_CORES_PER_NODE)
156 f.write(result)
157 f.write("\n")
158 f.flush()
159 f.close()
160