1 """ Example application demonstrating job submission via bigjob
2
3 DON'T EDIT THIS FILE (UNLESS THERE IS A BUG)
4
5 THIS FILE SHOULD NOT BE COMMITTED TO SVN WITH USER-SPECIFIC PATHS!
6 """
7
8 import os
9 import time
10 import pdb
11
12
13 import sys
14 sys.path.insert(0, os.getcwd() + "/../")
15
16
17 """ This variable defines the coordination system that is used by BigJob
18 e.g.
19 advert://localhost (SAGA/Advert SQLITE)
20 advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL)
21 advert://advert.cct.lsu.edu:5432 (SAGA/Advert POSTGRESQL)
22 redis://localhost:6379 (Redis at localhost)
23 tcp://localhost (ZMQ)
24 tcp://* (ZMQ - listening to all interfaces)
25 """
26
27
28
29
30 COORDINATION_URL = "advert://localhost/?dbtype=sqlite3"
31
32 from bigjob import bigjob, subjob, description
33
34
35
36 NUMBER_JOBS=8
37
39 state = state.lower()
40 if state=="done" or state=="failed" or state=="canceled":
41 return True
42 else:
43 return False
44
45
46 """ Test Job Submission via Advert """
47 if __name__ == "__main__":
48
49 starttime=time.time()
50
51
52
53 queue=None
54 project=None
55 walltime=100
56 processes_per_node=4
57 number_of_processes =2
58 workingdirectory= os.path.join(os.getcwd(), "agent")
59 userproxy = None
60
61
62 """
63 URL of the SAGA Job Service that is used to dispatch the pilot job.
64 The following URLs are accepted:
65
66 lrms_url = "gram://oliver1.loni.org/jobmanager-pbs" # globus resource url used when globus is used. (LONI)
67 lrms_url = "pbspro://louie1.loni.org" # pbspro resource url used when pbspro scheduling system is used.(Futuregrid or LSU Machines)
68 lrms_url = "ssh://louie1.loni.org" # ssh resource url which launches jobs on target machine. Jobs not submitted to scheduling system.
69 lrms_url = "pbs-ssh://louie1.loni.org" # Submit jobs to scheduling system of remote machine.
70 lrms_url = "xt5torque://localhost" # torque resource url.
71
72 Please ensure that the respective SAGA adaptor is installed and working
73 """
74 lrms_url = "fork://localhost"
75
76
77
78 print "Start Pilot Job/BigJob at: " + lrms_url
79 bj = bigjob(COORDINATION_URL)
80 bj.start_pilot_job( lrms_url,
81 None,
82 number_of_processes,
83 queue,
84 project,
85 workingdirectory,
86 userproxy,
87 walltime,
88 processes_per_node)
89
90 print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + str(bj.get_state())
91
92
93
94 jobs = []
95 job_start_times = {}
96 job_states = {}
97 for i in range(0, NUMBER_JOBS):
98 jd = description()
99 jd.executable = "/bin/date"
100 jd.number_of_processes = "1"
101 jd.spmd_variation = "single"
102 jd.arguments = [""]
103 jd.output = "sj-stdout-"+str(i)+".txt"
104 jd.error = "sj-stderr-"+str(i)+".txt"
105
106 sj = subjob()
107 sj.submit_job(bj.pilot_url, jd)
108 jobs.append(sj)
109 job_start_times[sj]=time.time()
110 job_states[sj] = sj.get_state()
111
112
113 while 1:
114 finish_counter=0
115 result_map = {}
116 for i in range(0, NUMBER_JOBS):
117 old_state = job_states[jobs[i]]
118 state = jobs[i].get_state()
119
120 if result_map.has_key(state)==False:
121 result_map[state]=1
122 else:
123 result_map[state] = result_map[state]+1
124
125 if old_state != state:
126 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
127 if old_state != state and has_finished(state)==True:
128 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
129 if has_finished(state)==True:
130 finish_counter = finish_counter + 1
131 job_states[jobs[i]]=state
132
133 if finish_counter == NUMBER_JOBS:
134 break
135 time.sleep(2)
136
137 runtime = time.time()-starttime
138 print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
139
140
141 bj.cancel()
142
143