1
2
3 import textwrap
4 import re
5 import saga
6 import os
7
9 """Constructor"""
10 - def __init__(self,bootstrap_script,lrms_saga_url,walltime,nodes,ppn,userproxy,working_directory=None):
11 self.job_id = ""
12 self.lrms_saga_url = lrms_saga_url
13 self.lrms_saga_url.scheme="ssh"
14 self.userproxy = userproxy
15 self.working_directory = ""
16 if working_directory == None:
17 self.working_directory = ""
18 else:
19 self.working_directory = working_directory
20
21 self.bootstrap_script = textwrap.dedent("""import sys
22 import os
23 import urllib
24 import sys
25 import time
26 import textwrap
27
28 qsub_file_name="bigjob_pbs_ssh"
29
30 qsub_file = open(qsub_file_name, "w")
31 qsub_file.write("#PBS -l nodes=%s:ppn=%s")
32 qsub_file.write("\\n")
33 qsub_file.write("#PBS -l walltime=%s")
34 qsub_file.write("\\n")
35 qsub_file.write("cd %s")
36 qsub_file.write("\\n")
37 qsub_file.write("python -c XX" + textwrap.dedent(\"\"%s\"\") + "XX")
38 qsub_file.close()
39 os.system( "qsub " + qsub_file_name)
40 """) % (str(nodes),str(ppn),str(walltime),str(self.working_directory),bootstrap_script)
41
42 self.bootstrap_script = self.bootstrap_script.replace("\"","\\\"")
43 self.bootstrap_script = self.bootstrap_script.replace("\\\\","\\\\\\\\\\")
44 self.bootstrap_script = self.bootstrap_script.replace("XX","\\\\\\\"")
45 self.bootstrap_script = "\"" + self.bootstrap_script+ "\""
46
47
49 jd = saga.job.description()
50 jd.arguments = ["-c", self.bootstrap_script]
51 jd.executable = "python"
52 jd.working_directory = self.working_directory
53 jd.set_attribute("Interactive", "True")
54
55 js = None
56 if self.userproxy != None and self.userproxy != '':
57 s = saga.session()
58 os.environ["X509_USER_PROXY"]=self.userproxy
59 ctx = saga.context("x509")
60 ctx.set_attribute ("UserProxy", self.userproxy)
61 s.add_context(ctx)
62 print "use proxy: " + self.userproxy
63 js = saga.job.service(s, self.lrms_saga_url)
64 else:
65 print "use standard proxy"
66 js = saga.job.service(self.lrms_saga_url)
67 pbssshjob = js.create_job(jd)
68 print "Submit pilot job to: " + str(self.lrms_saga_url)
69 pbssshjob.run()
70 joboutput= pbssshjob.get_stdout()
71 self.job_id=(joboutput.read()).split(".")[0]
72
74 jd = saga.job.description()
75 jd.set_attribute("Executable", "qstat")
76 jd.set_attribute("Interactive", "True")
77 jd.set_vector_attribute("Arguments", [self.job_id])
78
79 js = saga.job.service(self.lrms_saga_url);
80
81 job = js.create_job(jd)
82 job.run()
83
84 job.wait(-1)
85
86 output = job.get_stdout()
87 output.readline()
88 output.readline()
89 state=output.readline()
90 state=(re.sub(r'\s+',' ',state)).split(' ')[4]
91 if state == "R":
92 state = "Running"
93 elif state == "C" or state == "E":
94 state = "Done"
95 else:
96 state = "Unknown"
97 return state
98
100 jd = saga.job.description()
101 jd.set_attribute("Executable", "qdel")
102 jd.set_attribute("Interactive", "True")
103 jd.set_vector_attribute("Arguments", [self.job_id])
104
105 js = saga.job.service(self.lrms_saga_url);
106
107 job = js.create_job(jd)
108 job.run()
109
110 job.wait(-1)
111