1
2
3 """many_job example with affinity.
4
5 This Module is used to launch a set of bigjobs.
6
7 """
8 import getopt
9 import time
10 import pdb
11 import os
12 import traceback
13 import logging
14 import sys
15
16
17 sys.path.insert(0, os.getcwd() + "/../")
18
19
20 from bigjob import bigjob, subjob, description
21 from bigjob_dynamic.many_job_affinity import *
22
23
24 """ This variable defines the coordination system that is used by BigJob
25 e.g.
26 advert://localhost (SAGA/Advert SQLITE)
27 advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL)
28 redis://localhost:6379 (Redis at localhost)
29 tcp://localhost (ZMQ)
30 """
31 COORDINATION_URL = "advert://localhost/?dbtype=sqlite3"
32
33 NUMBER_JOBS=8
34
35
37 state = state.lower()
38 if state=="done" or state=="failed" or state=="canceled":
39 return True
40 else:
41 return False
42
43 """ Test Job Submission via ManyJob abstraction """
44 if __name__ == "__main__":
45 try:
46 print "ManyJob load test with " + str(NUMBER_JOBS) + " jobs."
47 starttime=time.time()
48
49
50 resource_list = []
51 resource_list.append( {"resource_url" : "fork://localhost/", "number_of_processes" : "2", "allocation" : "myAllocation",
52 "queue" : "workq",
53 "working_directory": (os.getcwd() + "/agent"),
54 "walltime": 10, "affinity" : "affinity1"})
55
56
57
58
59
60 print "Create manyjob service "
61 mjs = many_job_affinity_service(resource_list, COORDINATION_URL)
62
63 jobs = []
64 job_start_times = {}
65 job_states = {}
66 cwd = os.getcwd()
67 for i in range(0, NUMBER_JOBS):
68
69 jd = description()
70 jd.executable = "/bin/date"
71 jd.number_of_processes = "1"
72 jd.spmd_variation = "single"
73 jd.arguments = [""]
74 jd.working_directory = "/Users/luckow"
75 jd.output = os.getcwd() + "/stdout-" + str(i) + ".txt"
76 jd.error = os.getcwd() + "/stderr-" + str(i) + ".txt"
77 jd.environment = ["affinity=affinity1"]
78 subjob = mjs.create_job(jd)
79 subjob.run()
80 print "Submited sub-job " + "%d"%i + "."
81 jobs.append(subjob)
82 job_start_times[subjob]=time.time()
83 job_states[subjob] = subjob.get_state()
84 print "************************ All Jobs submitted ************************"
85 while 1:
86 finish_counter=0
87 result_map = {}
88 for i in range(0, NUMBER_JOBS):
89 old_state = job_states[jobs[i]]
90 state = jobs[i].get_state()
91 if result_map.has_key(state) == False:
92 result_map[state]=0
93 result_map[state] = result_map[state]+1
94
95 if old_state != state:
96 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
97 if old_state != state and has_finished(state)==True:
98 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
99 if has_finished(state)==True:
100 finish_counter = finish_counter + 1
101 job_states[jobs[i]]=state
102
103 print "Current states: " + str(result_map)
104 time.sleep(5)
105 if finish_counter == NUMBER_JOBS:
106 break
107
108 mjs.cancel()
109 runtime = time.time()-starttime
110 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
111 except:
112 traceback.print_exc(file=sys.stdout)
113 try:
114 mjs.cancel()
115 except:
116 pass
117