Package examples :: Module loadtest
[hide private]
[frames] | no frames]

Source Code for Module examples.loadtest

  1  import saga 
  2  import os 
  3  import time 
  4  import sys 
  5  import pdb 
  6  import datetime 
  7   
  8  from bigjob.bigjob_manager import bigjob, subjob 
  9   
 10  # configurationg 
 11  #COORDINATION_URL = "advert://advert.cct.lsu.edu:8080" 
 12  #COORDINATION_URL = "redis://localhost" 
 13  #COORDINATION_URL = "redis://i136" 
 14  #COORDINATION_URL = "redis://cyder.cct.lsu.edu:8080" 
 15  #COORDINATION_URL = "redis://cyder.cct.lsu.edu:8080" 
 16  COORDINATION_URL = "tcp://localhost" 
 17   
 18  #NUMBER_JOBS=[256] 
 19  NUMBER_JOBS=[32] 
 20  NUMBER_NODES=[2] 
 21  #NUMBER_NODES=[16] 
 22  NUMBER_CORES_PER_NODE=1 
 23  RESULT_DIR="results" 
 24  RESULT_FILE_PREFIX="results/results-" 
 25  LRMS_URL="fork://localhost" 
 26   
 27   
28 -def has_finished(state):
29 state = state.lower() 30 if state=="done" or state=="failed" or state=="canceled": 31 return True 32 else: 33 return False
34 35
36 -def load_test(coordination_url, number_jobs, number_nodes, number_cores_per_node):
37 38 print "\n**************************************************************************************************************************************************\n" 39 print ("Start test scenario - #nodes:%d, #cores/node:%d, #jobs: %d, coordination-url:%s, lrms-url:%s"% 40 (number_nodes, number_cores_per_node, number_jobs, coordination_url, LRMS_URL)) 41 print "\n**************************************************************************************************************************************************\n" 42 43 starttime=time.time() 44 ########################################################################################## 45 # Start BigJob 46 # Parameter for BigJob 47 lrms_url = LRMS_URL 48 workingdirectory=os.getcwd() +"/agent" # working directory for agent 49 50 # start pilot job (bigjob_agent) 51 print "Start Pilot Job/BigJob at: " + lrms_url 52 bj = bigjob(coordination_url) 53 bj.start_pilot_job(lrms_url=lrms_url, 54 number_nodes=number_nodes, 55 processes_per_node=number_cores_per_node, 56 working_directory=workingdirectory 57 ) 58 59 queueing_time = None 60 subjob_submission_time = None 61 pilot_state = str(bj.get_state_detail()) 62 if pilot_state=="Running" and queueing_time==None: 63 queueing_time=time.time()-starttime 64 print "*** Pilot State: " + pilot_state + " queue time: " + str(queueing_time) 65 print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + pilot_state 66 67 ########################################################################################## 68 # Submit SubJob through BigJob 69 jobs = [] 70 job_start_times = {} 71 job_states = {} 72 for i in range(0, number_jobs): 73 jd = saga.job.description() 74 jd.executable = "/bin/date" 75 jd.number_of_processes = "1" 76 jd.spmd_variation = "single" 77 jd.arguments = [""] 78 jd.working_directory = os.getcwd() 79 jd.output = "sj-stdout-"+str(i)+".txt" 80 jd.error = "sj-stderr-"+str(i)+".txt" 81 82 sj = subjob() 83 sj.submit_job(bj.pilot_url, jd) 84 jobs.append(sj) 85 job_start_times[sj]=time.time() 86 job_states[sj] = sj.get_state() 87 88 if pilot_state != "Running": 89 pilot_state = str(bj.get_state_detail()) 90 if pilot_state=="Running" and queueing_time==None: 91 queueing_time=time.time()-starttime 92 print "*** Pilot State: " + pilot_state + " queue time: " + str(queueing_time) 93 94 subjob_submission_time = time.time()-starttime 95 # busy wait for completion 96 while 1: 97 pilot_state = str(bj.get_state()) 98 if pilot_state=="Running" and queueing_time==None: 99 queueing_time=time.time()-starttime 100 print "*** Pilot State: " + pilot_state + " queue time: " + str(queueing_time) 101 finish_counter=0 102 result_map = {} 103 for i in range(0, number_jobs): 104 old_state = job_states[jobs[i]] 105 state = jobs[i].get_state() 106 if result_map.has_key(state)==False: 107 result_map[state]=1 108 else: 109 result_map[state] = result_map[state]+1 110 #pdb.set_trace() 111 if old_state != state: 112 print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state 113 if old_state != state and has_finished(state)==True: 114 print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s." 115 if has_finished(state)==True: 116 finish_counter = finish_counter + 1 117 job_states[jobs[i]]=state 118 119 print "Pilot State: %s; %d/%d jobs finished"%(pilot_state,finish_counter,number_jobs) 120 if finish_counter >= number_jobs-1 or pilot_state == "Failed": 121 break 122 time.sleep(2) 123 124 runtime = time.time()-starttime 125 #print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS) 126 ########################################################################################## 127 # Cleanup - stop BigJob 128 129 result = ("%d,%d,%d,%s,%s,%s,%s,%s"% 130 (number_nodes, number_cores_per_node, number_jobs, str(runtime), str(queueing_time),coordination_url, LRMS_URL,str(subjob_submission_time))) 131 132 result_tab = ("%d\t%d\t%d\t%s\t%s\t%s\t%s"% 133 (number_nodes, number_cores_per_node, number_jobs, str(runtime), str(queueing_time), coordination_url, LRMS_URL)) 134 print ("#Nodes\t#cores/node\t#jobs\tRuntime\tQueuing Time\tCoordination URL\tLRMS URL") 135 print result_tab 136 137 bj.cancel() 138 # hack: delete manually pbs jobs of user 139 os.system("qstat -u `whoami` | grep -o ^[0-9]* |xargs qdel") 140 return result
141 142 143 144 """ Test Job Submission via BigJob """ 145 if __name__ == "__main__": 146 try: 147 os.mkdir(RESULT_DIR) 148 except: 149 pass 150 d =datetime.datetime.now() 151 result_filename = RESULT_FILE_PREFIX + d.strftime("%Y%m%d-%H%M%S") + ".csv" 152 f = open(result_filename, "w") 153 f.write("#Nodes,#cores/node,#jobs,Runtime,Queuing Time,Coordination URL,LRMS URL,SubJob Submission Time\n") 154 for i in range(0, len(NUMBER_JOBS)): 155 result = load_test(COORDINATION_URL, NUMBER_JOBS[i], NUMBER_NODES[i], NUMBER_CORES_PER_NODE) 156 f.write(result) 157 f.write("\n") 158 f.flush() 159 f.close() 160