Package bigjob :: Module pbsssh
[hide private]
[frames] | no frames]

Source Code for Module bigjob.pbsssh

  1  #!/usr/bin/env python 
  2   
  3  import textwrap 
  4  import re 
  5  import saga 
  6  import os 
  7   
8 -class pbsssh:
9 """Constructor"""
10 - def __init__(self,bootstrap_script,lrms_saga_url,walltime,nodes,ppn,userproxy,working_directory=None):
11 self.job_id = "" 12 self.lrms_saga_url = lrms_saga_url 13 self.lrms_saga_url.scheme="ssh" 14 self.userproxy = userproxy 15 self.working_directory = "" 16 if working_directory == None: 17 self.working_directory = "" 18 else: 19 self.working_directory = working_directory 20 21 self.bootstrap_script = textwrap.dedent("""import sys 22 import os 23 import urllib 24 import sys 25 import time 26 import textwrap 27 28 qsub_file_name="bigjob_pbs_ssh" 29 30 qsub_file = open(qsub_file_name, "w") 31 qsub_file.write("#PBS -l nodes=%s:ppn=%s") 32 qsub_file.write("\\n") 33 qsub_file.write("#PBS -l walltime=%s") 34 qsub_file.write("\\n") 35 qsub_file.write("cd %s") 36 qsub_file.write("\\n") 37 qsub_file.write("python -c XX" + textwrap.dedent(\"\"%s\"\") + "XX") 38 qsub_file.close() 39 os.system( "qsub " + qsub_file_name) 40 """) % (str(nodes),str(ppn),str(walltime),str(self.working_directory),bootstrap_script) 41 ### escaping characters 42 self.bootstrap_script = self.bootstrap_script.replace("\"","\\\"") 43 self.bootstrap_script = self.bootstrap_script.replace("\\\\","\\\\\\\\\\") 44 self.bootstrap_script = self.bootstrap_script.replace("XX","\\\\\\\"") 45 self.bootstrap_script = "\"" + self.bootstrap_script+ "\""
46 47
48 - def run(self):
49 jd = saga.job.description() 50 jd.arguments = ["-c", self.bootstrap_script] 51 jd.executable = "python" 52 jd.working_directory = self.working_directory 53 jd.set_attribute("Interactive", "True") 54 # Submit job 55 js = None 56 if self.userproxy != None and self.userproxy != '': 57 s = saga.session() 58 os.environ["X509_USER_PROXY"]=self.userproxy 59 ctx = saga.context("x509") 60 ctx.set_attribute ("UserProxy", self.userproxy) 61 s.add_context(ctx) 62 print "use proxy: " + self.userproxy 63 js = saga.job.service(s, self.lrms_saga_url) 64 else: 65 print "use standard proxy" 66 js = saga.job.service(self.lrms_saga_url) 67 pbssshjob = js.create_job(jd) 68 print "Submit pilot job to: " + str(self.lrms_saga_url) 69 pbssshjob.run() 70 joboutput= pbssshjob.get_stdout() 71 self.job_id=(joboutput.read()).split(".")[0]
72
73 - def get_state(self):
74 jd = saga.job.description() 75 jd.set_attribute("Executable", "qstat") 76 jd.set_attribute("Interactive", "True") 77 jd.set_vector_attribute("Arguments", [self.job_id]) 78 # connect to the local job service 79 js = saga.job.service(self.lrms_saga_url); 80 # submit the job 81 job = js.create_job(jd) 82 job.run() 83 # wait for the job to complete 84 job.wait(-1) 85 # print the job's output 86 output = job.get_stdout() 87 output.readline() 88 output.readline() 89 state=output.readline() 90 state=(re.sub(r'\s+',' ',state)).split(' ')[4] 91 if state == "R": 92 state = "Running" 93 elif state == "C" or state == "E": 94 state = "Done" 95 else: 96 state = "Unknown" 97 return state
98
99 - def cancel(self):
100 jd = saga.job.description() 101 jd.set_attribute("Executable", "qdel") 102 jd.set_attribute("Interactive", "True") 103 jd.set_vector_attribute("Arguments", [self.job_id]) 104 # connect to the local job service 105 js = saga.job.service(self.lrms_saga_url); 106 # submit the job 107 job = js.create_job(jd) 108 job.run() 109 # wait for the job to complete 110 job.wait(-1)
111