import os, sys, re, stat import restartlib import shutil import time from libutil import * import simenv,simlib,simsubs,simarchive,simproperties # this class is the encapsulation of a 'Restart' # which is a submitted simulation created and managed by # simfactory class SimRestart: def __init__(self): self.JobID = -1 self.RestartID = -1 self.LongRestartID = -1 self.SimulationName = None self.Properties = None self.SimulationLog = None self.Initialized = False def ClearMark(self, label="timestamp"): expression = "%s-([0-9\\.]+$)" % label rx = re.compile(expression) # make this as quick as possible for fptr in os.listdir(self.InternalDir): matches = rx.match(fptr) if matches != None: path = simlib.BuildPath(self.InternalDir, fptr) # remove existing mark try: os.unlink(path) except OSError, e: fatal("unable to delete simulation mark file %s, %s" % (path, e)) def mark(self, label="timestamp"): self.ClearMark(label) timestamp = time.time() np = simlib.BuildPath(self.InternalDir, "%s-%s" % (label, timestamp)) try: fptr = open(np, 'w+') fptr.close() except OSError, e: fatal("Unable to write %s file %s, %s" % (label, np, e)) def GetMarkTime(self, label="timestamp"): expression = "%s-([0-9\\.]+$)" % label rx = re.compile(expression) # make this as quick as possible for fptr in os.listdir(self.InternalDir): matches = rx.match(fptr) if matches != None: rr = matches.group(1) try: return float(rr) except: fatal("could not coerce %s %s into a float" % (label, rr)) return None def load(self, simulationName, restartid=None): if self.Initialized: # clean up before attempting to reload. self.done() self.Initialized = False # need basedir, simulationdir, and internaldir machineEntry = simenv.LocalMachineEntry self.SimulationName = simulationName self.BaseDir = simlib.GetBaseDir(machineEntry) self.SimulationDir = simlib.BuildPath(self.BaseDir, self.SimulationName) if not(os.path.exists(self.SimulationDir)): warning("simulation \"%s\" does not exist or is not readable" % self.SimulationName) return -1 self.InternalDir = simlib.BuildPath(self.SimulationDir, simenv.INTERNALDIRECTORY) self.CacheDir = simlib.BuildPath(self.BaseDir, 'CACHE') self.SimulationLog = restartlib.SimulationLog(self) sim1_marker = simlib.BuildPath(self.SimulationDir, 'SIMULATION_ID') if os.path.exists(sim1_marker): return -2 if restartid == None: propertyFile = simlib.BuildPath(self.InternalDir, "properties.ini") if not(os.path.exists(propertyFile)): warning("properties.ini does not exist for simulation %s" % self.SimulationName) return -1 self.Properties = simproperties.SimProperties() self.Properties.init(propertyFile) return 1 # load the specific restart. self.RestartID = int(restartid) self.LongRestartID = "%04d" % int(restartid) self.RestartDir = simlib.BuildPath(self.SimulationDir, "output-%s" % self.LongRestartID) self.InternalDir = simlib.BuildPath(self.RestartDir, simenv.INTERNALDIRECTORY) self.SimulationLog.Write("For simulation %s, loaded restart id %s, long restart id %s" % (self.SimulationName, self.RestartID, self.LongRestartID)) self.RestartIDs = restartlib.GetRestartIds(self) self.MaxRestartID = self.RestartIDs[-1] propertyFile = simlib.BuildPath(self.InternalDir, "properties.ini") if not(os.path.exists(propertyFile)): warning("properties.ini does not exist for simulation %s" % self.SimulationName) return -1 self.Properties = simproperties.SimProperties() self.Properties.init(propertyFile) self.Initialized = True return 1 def PrepareCheckpointing(self): current_parfile = self.Properties.parfile cpf = simlib.FileBaseName(current_parfile) cwork_dir = simlib.BuildPath(self.RestartDir, cpf) self.SimulationLog.Write("Current working directory for simulation is: %s" % cwork_dir) if not(os.path.exists(cwork_dir)): self.SimulationLog.Write("%s does not exist, creating" % cwork_dir) os.makedirs(cwork_dir) chkpoint_files = restartlib.GetCheckpointFiles(self.SimulationDir) if len(chkpoint_files) == 0: return False self.SimulationLog.Write("Checkpointing files found. Linking files") for ff in chkpoint_files: self.SimulationLog.Write("checkpoint file: %s" % ff) for ff in chkpoint_files: if not(os.path.exists(ff)): continue # need to extract output-xxxx and replace it with current restart. rx = r"(output-\d{4})" rr = re.compile(rx) matches = rr.search(ff) if matches == None: warning("unable to find restart dir for checkpoint file %s" % ff) continue try: restart_indicator = matches.group(1) dfile = ff.replace(restart_indicator, "output-%s" % self.LongRestartID).strip() self.SimulationLog.Write("linking checkpoint file %s to %s" % (ff, dfile)) info("linking checkpoint file %s to %s" % (ff, dfile)) try: if not os.access(os.path.dirname(dfile), os.F_OK): os.makedirs(os.path.dirname(dfile)) # since we are linking checkpoint files from every restart # make sure the final path doesn't exist before attempting # to link. if not(os.path.exists(dfile)): os.link(ff, dfile) except: shutil.copyfile(ff, dfile) except OSError, e: warning("Could not link checkpoint file %s to %s, %s" % (ff, dfile, e)) warning("Disabling checkpointing") return False return True def initRestart(self, simulationName): # first order of business is to attempt to clean up the simulation being worked on. restartlib.CleanupSimulation(simulationName, quiet=True, ignoreTimestamps=False) ret = self.load(simulationName) if ret < 0: parfile = simlib.GetParFile() if parfile == None: fatal("could not create simulation %s, no parameter file specified" % simulationName) display("Parameter file: %s" % parfile) self.create(simulationName, parfile) self.load(simulationName) (rids, max_restart_id, my_restart_id) = self.GetRestartId() self.RestartIDs = rids self.MaxRestartID = max_restart_id info("Found the following restart_ids: %s" % rids) if max_restart_id != -1: info("Maximum restart id determined to be: %04d" % max_restart_id) display("Assigned restart id: %s " % my_restart_id) self.RestartID = my_restart_id self.LongRestartID = "%04d" % self.RestartID self.RestartDir = simlib.BuildPath(self.SimulationDir, "output-%s" % self.LongRestartID) self.InternalDir = simlib.BuildPath(self.RestartDir, simenv.INTERNALDIRECTORY) for dd in [self.RestartDir, self.InternalDir]: try: os.makedirs(dd) except OSError, e: fatal("could not create directory \"%s\", %s" % (dd, e)) pass self.Initialized = True def userSubmit(self, simulationName): # perform submission. self.initRestart(simulationName) assert(self.Initialized) submitScript = restartlib.RequireSubmitScript(self) if restartlib.CheckActive(simulationName): rs = SimRestart() rs.load(simulationName) chainedJobId = restartlib.GetActiveJobId(simulationName) self.ActiveRestartID = restartlib.GetActiveRestartId(rs) rs.done() submitScript = restartlib.RequireSubmitScript(self) warning("Simulation %s already has an active submission. Chaining this submission onto job id %s" % (simulationName, chainedJobId)) self.submit(submitScript, chainedJobId, self.ActiveRestartID) return self.submit(submitScript) def IsActive(self): active_id = restartlib.GetActiveRestartId(self) return active_id == self.RestartID def preSubmit(self, simulationName, chainedJobId=None, chainedRestartId=None): self.initRestart(simulationName) assert(self.Initialized) ss = simenv.OptionsManager.GetOption('submitscript') if ss != None and os.path.exists(ss): submitScript = ss else: submitScript = self.Properties.submitscript if len(submitScript) == 0 or submitScript == "None": fatal("no submission script defined for simulation %s, submission disabled" % self.SimulationName) if not(os.path.exists(submitScript)): fatal("could not read submit script %s" % submitScript) self.submit(submitScript, chainedJobId, chainedRestartId) def GetPreviousProperties(self): assert(self.Initialized) if self.MaxRestartID == -1: return None restore_dir = simlib.BuildPath(self.SimulationDir, "output-%04d" % self.MaxRestartID) previousIni = simlib.BuildPath(restore_dir, simenv.INTERNALDIRECTORY, "properties.ini") properties = None if os.path.exists(previousIni): properties = simproperties.SimProperties() properties.init(previousIni) return properties def makeActive(self): assert self.Initialized assert self.RestartID > -1 curdir = os.getcwd() # set our current working directory to the restart dir in case the # submit script writes out any files. # TODO: we must not chdir os.chdir(self.RestartDir) try: # create a symlink indicating that this job is active. # create a relative symlink, not an absolute one # TODO: What should we do on architectures that don't # support symbolic links? os.symlink("output-%s" % self.LongRestartID, "%s-active" % self.RestartDir) except: fatal("unable to activate simulation %s, with restart id %s" % (self.SimulationName, self.RestartID)) os.chdir(curdir) self.SimulationLog.Write("Simulation %s with restart-id %s has been made active" % (self.SimulationName, self.RestartID)) def extractJobId(self, output): assert(self.Initialized) machineEntry = simenv.LocalMachineEntry self.SimulationLog.Write("received raw output: %s" % output) submitRegex = machineEntry.submitpattern self.SimulationLog.Write("using submitRegex: %s" % submitRegex) rx = re.compile(submitRegex, re.MULTILINE) matches = rx.search(output) # if we didn't match anything, just use whatever got output. if matches == None: job_id = "-1" else: job_id = matches.group(1) return job_id def makeScratchdir(self, DefineDatabase): scratchdir = DefineDatabase.SubAll(simlib.GetMachineOption('scratchdir')) if not(os.path.exists(scratchdir)): if not(scratchdir.startswith("/")): sspath = simlib.BuildPath(self.RestartDir, scratchdir) if not(os.path.exists(sspath)): try: # have to rely on os.system due to the possibility of shell expansion # annoying. os.system("mkdir -p %s" % sspath) except OSError, e: fatal("Error: could not make scratch directory \"%s\", %s" % (sspath, e)) scratchdir = sspath else: try: os.system("mkdir -p %s" % scratchdir) except OSError, e: fatal("Error: could not make scratch directory \"%s\", %s" % (scratchdir, e)) sspath = simlib.BuildPath(self.RestartDir, "scratch") try: os.system("ln -s %s %s 2>&1" % (scratchdir, sspath)) except OSError, e: pass DefineDatabase.Set("SCRATCHDIR", scratchdir) def submit(self, submitScript, chainedJobId=None, chainedRestartId=None): assert(self.Initialized) UseChaining = False DefineDatabase = simsubs.DefineDatabase() machineEntry = simenv.LocalMachineEntry submitscript_contents = simlib.GetFileContents(submitScript) MaxWalltime = restartlib.WallTime(machineEntry.maxwalltime) if not(simenv.OptionsManager.HasOption('walltime')): Walltime = MaxWalltime else: Walltime = restartlib.WallTime(simenv.OptionsManager.GetOption('walltime')) self.SimulationLog.Write("Restart for simulation %s created with restart id %s, long restart id %s" % (self.SimulationName, self.RestartID, self.LongRestartID)) self.SimulationLog.Write("Prepping for submission") existingProperties = self.GetPreviousProperties() # import walltime if no --walltime is specified. if existingProperties != None and not simenv.OptionsManager.HasOption('walltime') and existingProperties.HasProperty('walltime'): Walltime = restartlib.WallTime(existingProperties.GetProperty("walltime")) self.SimulationLog.Write("Using walltime %s from previous restart %s" % (existingProperties.GetProperty("walltime"), self.MaxRestartID)) else: self.SimulationLog.Write("No previous walltime available to be used, or --walltime was specified, using walltime %s" % Walltime.Walltime) if simenv.OptionsManager.HasOption("from-restart-id"): self.Properties.AddProperty("from-restart-id", simenv.OptionsManager.GetOption("from-restart-id")) self.SimulationLog.Write("from-restart-id was specified, value is %s" % simenv.OptionsManager.GetOption("from-restart-id")) hostname = machineEntry.hostname user = simlib.GetUsername() memory = simlib.GetMachineOption('memory') cpufreq = simlib.GetMachineOption('cpufreq') allocation = simlib.GetMachineOption('allocation') queue = simlib.GetMachineOption('queue') (nodes, ppn_used, procs, ppn, procs_requested, num_procs, num_threads) = simlib.GetProcs(existingProperties) parfile = self.Properties.parfile pf = simlib.BaseName(parfile) newparpath = simlib.BuildPath(self.RestartDir, pf) newsspath = simlib.BuildPath(self.InternalDir, "SubmitScript") pbsSimulationName = restartlib.CreatePbsSimulationName(self) new_properties = dict() new_properties['SOURCEDIR'] = self.Properties.sourcedir new_properties['SIMULATION_NAME'] = self.SimulationName new_properties['SHORT_SIMULATION_NAME'] = pbsSimulationName new_properties['SIMULATION_ID'] = self.Properties.simulationid new_properties['RESTART_ID'] = self.RestartID new_properties['RUNDIR'] = self.RestartDir new_properties['SCRIPTFILE'] = self.Properties.submitscript new_properties['EXECUTABLE'] = self.Properties.executable new_properties['PARFILE'] = newparpath new_properties['HOSTNAME'] = hostname new_properties['USER'] = user new_properties['NODES'] = nodes new_properties['PROCS'] = procs new_properties['PROCS_REQUESTED'] = procs_requested new_properties['PPN'] = ppn new_properties['PPN_USED'] = ppn_used new_properties['NUM_PROCS'] = num_procs new_properties['NUM_THREADS'] = num_threads new_properties['MEMORY'] = memory new_properties['CPUFREQ'] = cpufreq new_properties['ALLOCATION'] = allocation new_properties['QUEUE'] = queue new_properties['EMAIL'] = machineEntry.email walltt = Walltime # always restrict our walltime to maxwalltime if requested walltime # is too large. if MaxWalltime.walltime_seconds < Walltime.walltime_seconds: walltt = MaxWalltime # okay, our walltime requested was too large # find out if we should use automatic job chaining. if chainedJobId == None: UseChaining = True new_properties['WALLTIME'] = walltt.Walltime new_properties['WALLTIME_HH'] = walltt.walltime_hh new_properties['WALLTIME_MM'] = walltt.walltime_mm new_properties['WALLTIME_SS'] = walltt.walltime_ss new_properties['WALLTIME_SECONDS'] = walltt.walltime_seconds new_properties['WALLTIME_MINUTES'] = walltt.walltime_minutes new_properties['WALLTIME_HOURS'] = walltt.walltime_hours new_properties['SIMFACTORY'] = simenv.EXECUTABLE new_properties['SUBMITSCRIPT'] = newsspath new_properties['SCRIPTFILE'] = newsspath if chainedRestartId == None: new_properties['FROM_RESTART_COMMAND'] = "" else: new_properties['FROM_RESTART_COMMAND'] = "--from-restart-id=%s" % chainedRestartId if chainedJobId == None: new_properties['CHAINED_JOB_ID'] = "" else: new_properties['CHAINED_JOB_ID'] = chainedJobId for key in new_properties.keys(): DefineDatabase.Set(key, new_properties[key]) self.SimulationLog.Write("Defined substituion properties for submission") self.SimulationLog.Write(new_properties) # lets prepare our submit script. submitscript_contents = DefineDatabase.SubAll(submitscript_contents) if chainedJobId != None: if submitscript_contents.count(chainedJobId) == 0: fatal("Machine %s currently does not support job chaining. Please modify the submission script to support job chaining." % simenv.LocalMachine) # store the rest of our keys rr = { 'nodes': nodes, 'ppnused': ppn_used, 'procs':procs, 'ppn':ppn, 'procsrequested': procs_requested, 'numprocs': num_procs, 'numthreads':num_threads, 'queue': queue, 'allocation': allocation, 'hostname': hostname, 'user': user, 'memory': memory, 'cpufreq': cpufreq, 'pbsSimulationName': pbsSimulationName, 'walltime': Walltime.Walltime } if chainedJobId == None: rr['chainedjobid'] = "-1" else: rr['chainedjobid'] = str(chainedJobId) # cast this as dict() to silence a pychecker warning. for key in dict(rr).keys(): self.Properties.AddProperty(key, rr[key]) # write to our new properties directory. self.Properties.Filename = simlib.BuildPath(self.InternalDir, "properties.ini") info("writing to internalDir: %s" % self.InternalDir) self.Properties.Save() self.SimulationLog.Write("self.Properties: %s" % self.Properties.Filename) self.SimulationLog.Write(self.Properties.toString()) info("saving substituted submitscript contents to: %s" % newsspath) self.SimulationLog.Write("saving substituted submitscript contents to: %s" % newsspath) simlib.WriteContents(newsspath, submitscript_contents) # make executable! os.chmod(newsspath, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) self.makeScratchdir(DefineDatabase) submitCommand = DefineDatabase.SubAll(machineEntry.submit) self.SimulationLog.Write("Executing submission command: %s" % submitCommand) display("Executing submit command: %s" % submitCommand) # we only make the simulation active if this isn't a chained job. if chainedJobId == None: self.makeActive() output = simlib.ExecuteCommand(submitCommand, output=True) self.mark("simulation") job_id = self.extractJobId(output) self.SimulationLog.Write("After searching raw output, it was determined that the job_id is: %s" % job_id) self.SimulationLog.Write("If this is -1, that means the regex did NOT match anything. No job_id means no control.") self.Properties.AddProperty('jobid', job_id) self.Properties.Save() if job_id == "-1": warning("submit either failed or could not determine job id, output:") warning(output) return else: display("Submit finished, job id is %s" % job_id) self.JobID = job_id os.environ['PBS_JOBID'] = self.JobID # have to make sure chainedJobId is None, otherwise this is submit() is actually handling a preSubmit() call. if UseChaining and chainedJobId == None: numChains = restartlib.GetNumberOfRestarts(MaxWalltime, Walltime) restart = None previousRestart = None # we already submitted the first chain while numChains > 0: if restart != None: previousRestart = restart restart = SimRestart() restart.load(self.SimulationName) if previousRestart == None: restart.preSubmit(self.SimulationName, self.JobID, self.RestartID) else: restart.preSubmit(self.SimulationName, previousRestart.JobID, previousRestart.RestartID) numChains = numChains - 1 self.SimulationLog.Write("Simulation %s, with restart id %s, and job id %s has been submitted" % (self.SimulationName, self.RestartID, self.JobID)) def GetRestartId(self): rids = restartlib.GetRestartIds(self) if len(rids) > 0: max_restart_id = rids[len(rids)-1] else: max_restart_id = -1 if simenv.OptionsManager.HasOption("restart-id"): ii = simenv.OptionsManager.GetOption("restart-id") try: my_restart_id = int(ii) except ValueError: fatal("Could not coerce provided restart-id %s into an integer" % ii) else: my_restart_id = max_restart_id + 1 if not(simenv.OptionsManager.HasOption("restart-id")): if my_restart_id in rids: fatal("assigned restart id %s for simulation %s already in use!" % my_restart_id) info("Assigned restart_id of: %04d" % my_restart_id) if my_restart_id > 9999: fatal("maximum number of restarts reached. Please use sim purge to clear existing restarts") if max_restart_id == my_restart_id: max_restart_id = max_restart_id - 1 return (rids, max_restart_id, my_restart_id) def interactive(self): DefineDatabase = simsubs.DefineDatabase() # need basedir, simulationdir, and internaldir machineEntry = simenv.LocalMachineEntry # need to prepare replacements. MaxWalltime = restartlib.WallTime(machineEntry.maxwalltime) if not(simenv.OptionsManager.HasOption('walltime')): Walltime = MaxWalltime else: Walltime = restartlib.WallTime(simenv.OptionsManager.GetOption('walltime')) hostname = machineEntry.hostname user = simlib.GetUsername() memory = simlib.GetMachineOption('memory') cpufreq = simlib.GetMachineOption('cpufreq') allocation = simlib.GetMachineOption('allocation') queue = simlib.GetMachineOption('queue') (nodes, ppn_used, procs, ppn, procs_requested, num_procs, num_threads) = simlib.GetProcs(None) pbsSimulationName = restartlib.CreatePbsSimulationName(self) new_properties = dict() new_properties['HOSTNAME'] = hostname new_properties['USER'] = user new_properties['SHORT_SIMULATION_NAME'] = pbsSimulationName new_properties['NODES'] = nodes new_properties['PROCS'] = procs new_properties['EMAIL'] = machineEntry.email new_properties['PROCS_REQUESTED'] = procs_requested new_properties['PPN'] = ppn new_properties['PPN_USED'] = ppn_used new_properties['NUM_PROCS'] = num_procs new_properties['NUM_THREADS'] = num_threads new_properties['MEMORY'] = memory new_properties['CPUFREQ'] = cpufreq new_properties['ALLOCATION'] = allocation new_properties['QUEUE'] = queue new_properties['WALLTIME'] = Walltime.Walltime new_properties['WALLTIME_HH'] = Walltime.walltime_hh new_properties['WALLTIME_MM'] = Walltime.walltime_mm new_properties['WALLTIME_SS'] = Walltime.walltime_ss new_properties['WALLTIME_SECONDS'] = Walltime.walltime_seconds new_properties['WALLTIME_MINUTES'] = Walltime.walltime_minutes new_properties['WALLTIME_HOURS'] = Walltime.walltime_hours new_properties['SIMFACTORY'] = simenv.EXECUTABLE for key in new_properties.keys(): DefineDatabase.Set(key, new_properties[key]) display("Entering interactive mode") interactivecmd = DefineDatabase.SubAll(machineEntry.interactivecmd) simlib.ExecuteCommand(interactivecmd) def userRun(self, simulationName, debug=False): self.initRestart(simulationName) assert(self.Initialized) machineEntry = simenv.LocalMachineEntry # do setup that isn't done because this wasn't submitted. self.SimulationLog.Write("Creating new properties because this is an independant run, not a run following a submit") hostname = machineEntry.hostname user = simlib.GetUsername() memory = simlib.GetMachineOption('memory') cpufreq = simlib.GetMachineOption('cpufreq') pbsSimulationName = restartlib.CreatePbsSimulationName(self) existingProperties = self.GetPreviousProperties() (nodes, ppn_used, procs, ppn, procs_requested, num_procs, num_threads) = simlib.GetProcs(existingProperties) # store the rest of our keys rr = { 'nodes': nodes, 'ppnused': ppn_used, 'procs':procs, 'ppn':ppn, 'procsrequested': procs_requested, 'numprocs': num_procs, 'numthreads':num_threads, 'hostname': hostname, 'user': user, 'memory': memory, 'cpufreq': cpufreq, 'pbsSimulationName': pbsSimulationName } # cast this as a dict to silence a pychecker warning. for key in dict(rr).keys(): self.Properties.AddProperty(key, rr[key]) self.Properties.Filename = simlib.BuildPath(self.RestartDir, simenv.INTERNALDIRECTORY, "properties.ini") self.Properties.Save() self.SimulationLog.Write("Determined the following properties") self.SimulationLog.Write(self.Properties.toString()) self.makeActive() self.run(debug) def submitRun(self, simulationName, restartId): self.load(simulationName, restartId) assert(self.Initialized) if self.Properties.HasProperty('finished') and self.Properties.finished == "yes": fatal("cannot rerun a restart that has been finished.") # if we are following a previous restart, attempt to clean that previous restart up # and make this new restart active. if simenv.OptionsManager.HasOption("from-restart-id"): frid = simenv.OptionsManager.GetOption("from-restart-id") self.SimulationLog.Write("Following restart-id %s, finishing it." % frid) restart = SimRestart() restart.load(simulationName, frid) restart.finish() self.makeActive() self.run() def GenerateNodefile(self): mpi_nodefile = simlib.BuildPath(self.RestartDir, 'NODELIST') info("mpi_nodefile: %s" % mpi_nodefile) if not(os.path.exists(mpi_nodefile)): nodes = simlib.GetUseNodes() if len(nodes) > 0: nodelist = "\n".join(nodes) else: nodefiles = ['PE_HOSTFILE', 'PBS_NODEFILE'] found = False for nf in nodefiles: if os.environ.has_key(nf): contents = simlib.GetFileContents(os.environ[nf]) nodelist = contents.split("\n") found = True if not found: nodelist = ['localhost'] info("mpi nodelist: %s" % nodelist) simlib.WriteContents(mpi_nodefile, '%s\n' % "\n".join(nodelist)) self.SimulationLog.Write("mpi_nodefile did not exist, so we attempted to make it. The contents are below") self.SimulationLog.Write("\n".join(nodelist)) else: contents = simlib.GetFileContents(mpi_nodefile) info("mpi nodelist: %s" % contents.split("\n")) self.SimulationLog.Write("mpi nodelist existed, contents are below") self.SimulationLog.Write(contents) os.environ['PBS_NODEFILE'] = mpi_nodefile os.environ['PE_NODEFILE'] = mpi_nodefile def run(self, debug=False): assert(self.Initialized) assert(self.IsActive()) DefineDatabase = simsubs.DefineDatabase() display("Running simulation %s" % self.SimulationName) self.SimulationLog.Write("Prepping for execution/run") checkpointing = self.PrepareCheckpointing() if checkpointing: self.Properties.AddProperty("checkpointing", "yes") self.SimulationLog.Write("Checkpoint files found, recovering") else: self.Properties.AddProperty("checkpointing", "no") scratchdir = simlib.GetMachineOption('scratchdir') parname = simlib.FileBaseName(self.Properties.parfile) my_workdir = simlib.BuildPath(self.RestartDir, parname) if not(os.path.exists(my_workdir)): try: os.mkdir(my_workdir) except OSError, e: fatal("could not make working directory path \"%s\", %s" % (my_workdir, e)) self.GenerateNodefile() os.environ['CACTUS_STARTTIME'] = str(int(time.time())) # do parfile substitution parfile = self.Properties.parfile pf = simlib.BaseName(parfile) newparpath = simlib.BuildPath(self.RestartDir, pf) new_properties = dict() new_properties['SOURCEDIR'] = self.Properties.sourcedir new_properties['SIMULATION_NAME'] = self.SimulationName new_properties['SHORT_SIMULATION_NAME'] = self.Properties.pbsSimulationName new_properties['SIMULATION_ID'] = self.Properties.simulationid new_properties['RESTART_ID'] = self.RestartID new_properties['RUNDIR'] = self.RestartDir new_properties['SCRIPTFILE'] = self.Properties.submitscript new_properties['SUBMITSCRIPT'] = self.Properties.submitscript new_properties['EXECUTABLE'] = self.Properties.executable new_properties['PARFILE'] = newparpath new_properties['HOSTNAME'] = self.Properties.hostname new_properties['USER'] = self.Properties.user new_properties['NODES'] = self.Properties.nodes new_properties['PROCS'] = self.Properties.procs new_properties['PROCS_REQUESTED'] = self.Properties.procsrequested new_properties['PPN'] = self.Properties.ppn new_properties['PPN_USED'] = self.Properties.ppnused new_properties['NUM_PROCS'] = self.Properties.numprocs new_properties['NUM_THREADS'] = self.Properties.numthreads new_properties['MEMORY'] = self.Properties.memory new_properties['CPUFREQ'] = self.Properties.cpufreq if debug: new_properties['RUNDEBUG'] = 1 new_properties['DEBUGGER'] = simenv.OptionsManager.GetOption('debugger') else: new_properties['RUNDEBUG'] = 0 if self.Properties.HasProperty("walltime"): walltime_raw = self.Properties.walltime Walltime = restartlib.WallTime(walltime_raw) new_properties['WALLTIME'] = Walltime.Walltime new_properties['WALLTIME_HH'] = Walltime.walltime_hh new_properties['WALLTIME_MM'] = Walltime.walltime_mm new_properties['WALLTIME_SS'] = Walltime.walltime_ss new_properties['WALLTIME_SECONDS'] = Walltime.walltime_seconds new_properties['WALLTIME_MINUTES'] = Walltime.walltime_minutes new_properties['WALLTIME_HOURS'] = Walltime.walltime_hours # parfile. # make sure the parfile has the correct walltime in it DefineDatabase.AddReplacement("TerminationTrigger::max_walltime", Walltime.walltime_hours) else: DefineDatabase.AddReplacement("TerminationTrigger::max_walltime", "0") new_properties['SCRATCHDIR'] = scratchdir #new_properties['CHAINED_JOB_ID'] = '' self.SimulationLog.Write("Defined substituion properties for execution/run") self.SimulationLog.Write(new_properties) for key in new_properties.keys(): DefineDatabase.Set(key, new_properties[key]) contents = DefineDatabase.SubAll(simlib.GetFileContents(parfile)) simlib.WriteContents(newparpath, contents) self.Properties.Save() rs = simenv.OptionsManager.GetOption('runscript') if rs != None and os.path.exists(rs): runScript = rs else: if not(self.Properties.HasProperty('runscript')): fatal("no runscript defined for simulation %s" % self.SimulationName) runScript = self.Properties.runscript contents = simlib.GetFileContents(runScript) contents = DefineDatabase.SubAll(contents) preparedRunScript = simlib.BuildPath(self.InternalDir, 'RunScript') simlib.WriteContents(preparedRunScript, contents) os.chmod(preparedRunScript, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) cmd = preparedRunScript # change to RestartDir and then run the simulation. # TODO -- we're not supposed to do this apparently. os.chdir(self.RestartDir) self.SimulationLog.Write("Executing run command: %s" % cmd) simlib.ExecuteReplaceCommand(cmd) display("Simfactory Done at date: %s" % os.system("date")) def done(self): # this function cleans up the simulation. Specifically, it closes the simulation log file. self.SimulationLog.Close() #self.ClearMark() def create(self, simulationName, parfile): DefineDatabase = simsubs.DefineDatabase() # lets start here. #the calling app (sim/sim-job/sim-run), etc, will determine where simulationName, parfile come from self.SimulationName = simulationName self.Parfile = parfile if not(simlib.FileExists(self.Parfile)): fatal("Specified parfile %s does not exist or is not readable" % self.Parfile) configuration = restartlib.GetConfiguration() configPath = simlib.BuildPath(simenv.CONFIGS_PATH, configuration) (exe, submitScript, runScript) = restartlib.GetExecutable() optionlist = simlib.BuildPath(configPath, 'OptionList') config_id = simlib.GetFileContents(simlib.BuildPath(configPath, 'CONFIG-ID'), 'no-config-id') build_id = simlib.GetFileContents(simlib.BuildPath(configPath, 'BUILD-ID'), 'no-build-id') (self.BaseDir, self.SimulationDir, self.InternalDir, self.CacheDir) = restartlib.CreateRestartSkeleton(self.SimulationName) #now that we have the simulationdir made, lets attach our log self.SimulationLog = restartlib.SimulationLog(self) self.SimulationLog.Write("Creating simulation %s" % self.SimulationName) self.SimulationLog.Write("Simulation directory: %s" % self.SimulationDir) display("Skeleton Created") display("Job directory: \"%s\"" % self.SimulationDir) machine = simenv.LocalMachine machineEntry = simenv.LocalMachineEntry user = machineEntry.user email = machineEntry.email DefineDatabase.Set("USER", user) DefineDatabase.Set("EMAIL", email) DefineDatabase.Set("MACHINE", simenv.LocalMachine) localsourcebasedir = simlib.GetLocalSourceBaseDir() dirsuffix = simlib.GetDirSuffix(localsourcebasedir) sourcedir = simlib.BuildPath(localsourcebasedir, dirsuffix) #dirname == simulationdir # need to create a simulation id simulation_id = restartlib.CreateSimulationId(self.SimulationName) propertyFile = simlib.BuildPath(self.InternalDir, "properties.ini") self.Properties = simproperties.SimProperties(baseProperties={"finished": "no"}) self.Properties.init(propertyFile) self.Properties.AddProperty('machine', machine) self.Properties.AddProperty('simulationid', simulation_id) self.Properties.AddProperty('sourcedir', sourcedir) self.Properties.AddProperty('configuration', configuration) self.Properties.AddProperty('configid', config_id) self.Properties.AddProperty('buildid', build_id) self.Properties.Save() #roots = ['exe', 'cfg', 'run', 'par'] (exedir, cfgdir, rundir, pardir, datadir) = restartlib.CreateInternalDirs(self.InternalDir) # runscript # exe ef = simlib.BaseName(exe) exefile = simlib.BuildPath(exedir, ef) restartlib.CopyFileWithCaching(exe, exedir, simlib.BuildPath(self.CacheDir, 'exe')) os.chmod(exefile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) display("Executable: \"%s\"" % exe) self.Properties.AddProperty('executable', exefile) # config cfgfile = simlib.BaseName(optionlist) contents = simlib.GetFileContents(optionlist) contents = DefineDatabase.SubAll(contents) cfgpath = simlib.BuildPath(cfgdir, cfgfile) simlib.WriteContents(cfgpath, contents) display("Option list: \"%s\"" % cfgpath) self.Properties.AddProperty('optionlist', simlib.BuildPath(cfgdir, cfgfile)) # submit script if submitScript != None: queuefile = simlib.BaseName(submitScript) contents = simlib.GetFileContents(submitScript) contents = DefineDatabase.SubAll(contents) submitpath = simlib.BuildPath(rundir, queuefile) simlib.WriteContents(submitpath, contents) display("Submit script: \"%s\"" % submitpath) self.Properties.AddProperty('submitscript', submitpath) else: self.Properties.AddProperty('submitscript', "None") # run script if runScript != None: runfile = simlib.BaseName(runScript) contents = simlib.GetFileContents(runScript) contents = DefineDatabase.SubAll(contents) runpath = simlib.BuildPath(rundir, runfile) simlib.WriteContents(runpath, contents) display("Run script: \"%s\"" % runpath) self.Properties.AddProperty('runscript', runpath) else: self.Properties.AddProperty('runscript', "None") # parfile par = simlib.BaseName(parfile) contents = simlib.GetFileContents(parfile) contents = DefineDatabase.SubAll(contents) parpath = simlib.BuildPath(pardir, par) simlib.WriteContents(parpath, contents) display("Parameter file: \"%s\"" % parpath) self.Properties.AddProperty('parfile', parpath) self.Properties.Save() self.SimulationLog.Write("Simulation Properties:") self.SimulationLog.Write(self.Properties.toString()) # data directory if simenv.OptionsManager.HasOption('datadir'): datasrc = simenv.OptionsManager.GetOption('datadir') if not(os.path.exists(datadir)): fatal("could not open data directory \"%s\" for reading" % datasrc) shutil.copytree(datasrc, datadir, True) display("Data Directory: \"%s\"" % datasrc) self.SimulationLog.Write("Simulation %s created" % self.SimulationName) def stop(self): DefineDatabase = simsubs.DefineDatabase() job_id = self.GetJobId() if job_id == -1: fatal("cannot stop a job without an associated job_id") self.SimulationLog.Write("Stopping simulation %s, with restart id %s, with job id %s" % (self.SimulationName, self.RestartID, job_id)) status = restartlib.GetJobStatus(job_id) if status == 'U': display("job %s already finished or stopped" % job_id) self.SimulationLog.Write("Job status determined to be 'U', unable to stop, or already stopped/finished") return machine = simlib.GetMachineName() machineEntry = simenv.ConfigurationDatabase.GetMachine(machine) stopcmd = machineEntry.stop DefineDatabase.Reset() DefineDatabase.Set("JOB_ID", job_id) DefineDatabase.Set("USER", machineEntry.user) # it might be a regex, make sure it's in python format. stopcmd = DefineDatabase.SubAll(stopcmd) force = simenv.OptionsManager.GetOption('force') term_file = simlib.BuildPath(self.RestartDir, 'TERMINATE') if os.path.exists(term_file) and not force: info("TERMINATE exists, stopping %s gracefully" % job_id) self.SimulationLog.Write("TERMINATE file exists for job_id %s, terminating gracefully" % job_id) # write 1 to term_file fptr = open(term_file, 'w+') fptr.write('1') fptr.close() self.finish() return display("Forcing %s to stop without using graceful termination" % job_id) simlib.ExecuteCommand(stopcmd) return def show_output(self): DefineDatabase = simsubs.DefineDatabase() (machine, machineEntry, sourceBaseDir) = simlib.GetLocalEnvironment() job_id = self.GetJobId() job_status = 'U' if job_id != -1: job_status = restartlib.GetJobStatus(job_id) DefineDatabase.Set("JOB_ID", job_id) parname = simlib.FileBaseName(self.Properties.parfile) workdir = simlib.BuildPath(self.RestartDir, parname) exechost = restartlib.GetExecHost(self) if exechost == None and job_status == 'R': fatal("could not retreive exechost for running simulation %s" % self.SimulationName) DefineDatabase.Set("EXECHOST", exechost) output = False stdout_output = "(file does not exist)" stderr_output = "(file does not exist)" formaline_output = "(file does not exist)" formaline_file = simlib.BuildPath(workdir, 'formaline-jar.txt') if os.path.exists(formaline_file): formaline_output = simlib.GetFileContents(formaline_file) if job_status == 'R': if simenv.OptionsManager.GetOption('follow'): followcmd = DefineDatabase.SubAll(machineEntry.GetKey('stdout-follow')) followcmd = "cd '%s' && { %s; }" % (self.RestartDir, followcmd) simlib.ExecuteReplaceCommand(followcmd) output = True else: stderrcmd = DefineDatabase.SubAll(machineEntry.GetKey('stderr')) stdoutcmd = DefineDatabase.SubAll(machineEntry.GetKey('stdout')) stdoutcmd = "cd '%s' && { %s; }" % (self.RestartDir, stdoutcmd) stderrcmd = "cd '%s' && { %s; }" % (self.RestartDir, stderrcmd) ff = os.popen(stdoutcmd) stdout_output = ff.read() ret = ff.close() if ret != None: warning("stdout command \"%s\" returned status %s" % (stdoutcmd, ret)) ff = os.popen(stderrcmd) stderr_output = ff.read() ret = ff.close() if ret != None: warning("stderr command \"%s\" returned status %s" % (stderr_output, ret)) output = True else: errfile = simlib.BuildPath(self.RestartDir, "%s.err" % self.SimulationName) outfile = simlib.BuildPath(self.RestartDir, "%s.out" % self.SimulationName) if os.path.exists(outfile): stdout_output = simlib.GetFileContents(outfile) if os.path.exists(errfile): stderr_output = simlib.GetFileContents(errfile) if not output: # Show stdout, stderr, and Formaline output sep = "=" * 80 display(sep) display("The job's Formaline output is:") display(sep) display(formaline_output) display(sep) display("The job's stdout is:") display(sep) display(stdout_output) display(sep) display("The job's stderr is:") display(sep) display(stderr_output) display(sep) def cleanup(self): # TODO: When is this routine called? It seems to always # perform a cleanup. # TODO: One could implement the actual cleanup in this # routine, and then have "finish" first check whether the # simulation should be cleaned up, and if so, call "cleanup". # This would separate the test-whether-to-clean-up and # clean-up logics, and would remove the need for the # "allowForce" flag. # allow force. self.finish(True) def finish(self, allowForce=False): self.SimulationLog.Write("For simulation %s, Finishing restart %s" % (self.SimulationName, self.LongRestartID)) # mark the simulation. self.mark() if self.Properties.HasProperty("finished") and self.Properties.finished == "yes": self.SimulationLog.Write("simulation %s, with restart %s, has already been finished" % (self.SimulationName, self.RestartID)) return if self.Properties.machine != simenv.LocalMachine: logonly("Error: cannot clean up a simulation created on machine %s from machine %s, skipping" % (self.Properties.machine, simenv.LocalMachine)) return # force is now an option. force = simenv.OptionsManager.GetOption('force') if not allowForce: force = False self.SimulationLog.Write("Force option: %s" % force) job_id = self.GetJobId() job_status = 'U' # if the job_id is -1, it means self.GetJobId was unable to determine a valid job id # eg, the simulation didn't actually submit itself. # if we have a job_id of -1, there's no point in attempting to get the job status # since it will always return 'U'. if job_id != -1: job_status = restartlib.GetJobStatus(job_id) self.SimulationLog.Write("Job ID: %s, Job Status: %s" % (job_id, job_status)) # The check to see if the job is active happens when it polls for the job # status using the function restartlib.GetJobStatus. If that returns anything # other than 'U', this job won't get cleaned up. if job_status == 'E' and not force: self.SimulationLog.Write("Job status is E, meaning the job was found in the queue, however, simfactory was unable to determine exactly what its status was.") self.SimulationLog.Write("In order to prevent cleaning up a simulation that may still be active, simfactory will leave this job alone.") warning("Job status is E: running/queued but unable to determine its exact status, leaving alone.") return if job_status != 'U' and not force: self.SimulationLog.Write("Job status is not 'U', it is '%s', and force is false, therefor this is a noop, no cleanup required" % job_status) return # clean up. self.SimulationLog.Write("Cleaning up simulation %s, restart %s, with job_status %s" % (self.SimulationName, self.RestartID, job_status)) # TODO: Before doing anything to the simulation, a log entry # needs to be written. # step 1. if this simulation is active and Force == False, the first thing we need to do is stop the job. if force: self.SimulationLog.Write("Forcing cleanup of an active simulation, stopping first") self.stop() # step 2. remove active flag active_dir = "%s-active" % self.RestartDir if os.path.exists(active_dir): try: os.unlink(active_dir) except: self.SimulationLog.Write("Unable to remove active_dir %s" % active_dir) fatal("cannot clean up simulation %s, restart id %s, unable to remove -active symlink %s" % (self.SimulationName, self.RestartID, active_dir)) # step 2. make terminate file not world writable, and fix other permissions if os.path.exists(self.RestartDir): try: os.chmod(self.RestartDir, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) except: self.SimulationLog.Write("unable to chmod %s to 755" % self.RestartDir) pass term_file = simlib.BuildPath(self.RestartDir, 'TERMINATE') if os.path.exists(term_file): try: os.chmod(term_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) except: self.SimulationLog.Write("unable to chmod termination file %s, it might not exist" % term_file) pass outfile = simlib.BuildPath(self.RestartDir, "%s.out" % self.SimulationName) errfile = simlib.BuildPath(self.RestartDir, "%s.err" % self.SimulationName) files = [outfile, errfile] for f in files: if os.path.exists(f): try: os.chmod(f, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) except: self.SimulationLog.Write("unable to chmod 644 %s, it may not exist" % f) pass # step 4. remove scratch dir machineEntry = simenv.LocalMachineEntry scratchdir = machineEntry.GetKey('scratchdir') # TODO: This looks like a logic error -- the scratch dir is # only removed if its path does not start with a slash. # TODO: This looks like a pattern -- calling BuildPath only if # a path does not start with a slash. This could be abstracted # into a function, e.g. "create-an-absolute-path-name". if not(scratchdir.startswith("/")): sspath = simlib.BuildPath(self.RestartDir, scratchdir) else: sspath = scratchdir if os.path.exists(sspath): shutil.rmtree(sspath, True) # step 5. remove half written checkpoint files try: os.system("find %s -name *.chkpt.tmp.it_*.* -exec rm -rf {} \;" % self.RestartDir) except: pass self.Properties.AddProperty("finished", "yes") self.Properties.Save() # step 6. Hard-link (Formaline) tarballs between different restarts # TODO: Why is this deactivated? Maybe it doesn't work # reliably... There should be a comment stating why it is # deactivated. I think Ian Hinder requested that feature; we # could ask him. # old_dir = os.getcwd() # os.chdir(self.SimulationDir) # # files = os.popen("find . -name \*.tar.gz" % self.RestartDir).read().split("\n") # # rids = restartlib.GetRestartIds(self) # rids.sort() # # for ff in files: # ff = ff.strip() # if len(ff) == 0: # continue # # if not(ff.endswith(".tar.gz")) or not(os.path.exists(ff)): # continue # # statinfo = os.stat(ff) # # if statinfo.st_nlink != 1: # continue # # # # for rid in rids.reverse(): # restart_path = simlib.BuildPath(self.SimulationDir, "output-%04d" % rid) # # if not(os.path.exists(restart_path)): # continue # # sspath = simlib.BuildPath(restart_path, file) # # if filecmp.cmp(sspath, ff) == 0: # os.link(sspath, ff) self.SimulationLog.Write("Simulation %s, restart %s, with job id %s has been successfully cleaned up" % (self.SimulationName, self.RestartID, job_id)) # TODO: The log entry should now state that the simulation has # been successfully deactivated. def archive(self): machineEntry = simenv.LocalMachineEntry archiveType = machineEntry.archivetype ArchiveEngine = simarchive.SimArchive(archiveType, self) ArchiveEngine.authenticate() ArchiveEngine.store() def GetJobId(self): # transitioning between having the jobid in two places. # look for the new place. if it has it in the properites file, # use it. otherwise, look for job.ini if self.Properties.HasProperty('jobid'): return self.Properties.jobid job_file = simlib.BuildPath(self.InternalDir, 'job.ini') if not(os.path.exists(job_file)): return -1 pjob = simproperties.SimProperties() pjob.init(job_file) return pjob.GetProperty("jobid") def attachLog(self, simulationdir): # noop pass return # all of these will be flushed out as i proceed. def trash(self): (machine, machineEntry, sourceBaseDir) = simlib.GetLocalEnvironment() basedir = simlib.GetBaseDir(machineEntry) trashPath = simlib.BuildPath(basedir, 'TRASH') if not(os.path.exists(trashPath)): try: os.system("mkdir -p %s" % trashPath) except OSError, e: fatal("could not make trash path \"%s\", %s" % (trashPath, e)) # okay, we have trash path. lets find out if any of our restarts are running. rids = restartlib.GetRestartIds(self) for rid in rids: rr = restartlib.GetRestartByRestartId(self.SimulationName, rid) job_id = rr.GetJobId() if job_id == -1: continue job_status = restartlib.GetJobStatus(job_id) if job_status != 'U': fatal("Error: Simulation %s, with Restart ID %s is either queued, holding, or running.\nJob ID %s must be stopped before a purge can happen." % (self.SimulationName, rid, job_id)) # okay, we have no running/queued/holding jobs. # lets move the simulation to the trash trashFolder = simlib.BuildPath(trashPath, self.Properties.simulationid) if not(os.path.exists(trashFolder)): try: os.system("mkdir -p %s" % trashFolder) except OSError, e: fatal("could not make trash folder \"%s\", %s" % (trashFolder, e)) shutil.move(self.SimulationDir, trashFolder) display("Simulation %s has been moved to trash folder %s" % (self.SimulationName, trashFolder)) return