#! /usr/bin/python # Test Cactus and SimFactory on a set of machines # This script "distribute" and its companion "distribute-watch" allow # easy testing of SimFactory and its MDB entries on many systems. In # the simplest case, you run "simfactory/bin/distribute", and this # tests on the default set of machines. In particular, it: # 1. syncs the current source tree to the machine (if not local), # 2. configures and builds from scratch both a debug and a # production configuration, # 3. runs the testsuite, # 4. submits a test job, and # 5. waits until the test job has finished. # # The options --no-sync, --no-build, --no-clean, --no-reconfig, # --no-testsuite, and --no-submit fine-tune the behaviour in the # obvious way. The options --configuration=..., --thornlist=..., # --parfile=..., and --walltime=... are also available. # # Non-option arguments specify the set of machines on which the test # should be performed. For example, I could use # ./simfactory/bin/distribute --no-clean redshift # to build and submit quickly on my local laptop. # # [The default list of machines should really be configurable in a # better way.] # # Since this script is designed to test multiple systems # simultaneously, it executes in the background and redirects output # for each system into a log file "log/MACHINE.out". Since it runs in # the background, it cannot ask for passwords to log into remote # systems. For Athena and Kraken, where one receives a token valid for # twelve hours, I usually log in once just before calling "distribute" # to obtain the token ahead of time. # # # # The companion script "distribute-watch" watches these log files: # ./simfactory/bin/distribute-watch # This displays the status of the distribution on one line per system, # and updates the display every minute. (It only looks at the log # files and does not access the remote systems directly.) # # The output is sorted by time; running tests appear near the top, and # finished tests appear near the bottom. "age" indicates the time at # which the log file was last written to (to see when a test was last # run, or whether a test has become stuck). "dur'n" is the duration # for which the test was running (to judge the speed of a system). # "size" is the size of the log file (also indicating whether a test # has become stuck). For example, the final link stage on Abe and # Lincoln can take up to an hour. # # The state "working..." for a system indicates that the test is # running, and "[done]" that the test has finished. The outputs # "[sim-debug]", "[sim]", and "[success]" indicate that building the # debug configuration, building the production configuration, and # running the test simulations succeeded. If these indicators are not # present, then the corresponding tests failed. # # I use my own thorn list for my tests, which includes all of the # Einstein Toolkit thorns, many non-public production thorns that my # collaborators and I use, as well as a set of infrastructure and # development thorns. The currently fastest machine for this thorn # list is Bethe, a workstation at Caltech, which completes the test in # 23 minutes. Lonestar, the new HPC cluster at TACC, comes in second # with 34 minutes. (The standard Einstein Toolkit thorn list should be # significantly faster than this.) # # # # [Erik says:] Given that the set of systems to which we have access # is so diverse (local workstations, local clusters, LONI, LSU, NERSC, # TerGrid, ...), there are always some systems which are not available # or not functional. Furthermore, there are a host of transient # reasons why a test may fail on a particular system. If a test fails, # I usually just rerun this test without much analysis (and possibly # with the --no-clean option, so that the previous build is continued # where it broke); only if a test fails twice, I begin to analyse the # problem in detail. # TODO: # 1. use "dist" as default configuration, not "sim" # 2. use different, built-in thornlist by default # 3. handle "memory=0" gracefully # 4. output simulation status (after "examining simulation status") # 5. output (remote) stdout file name when a simulation aborted # 6. examine testsuite output (and handle ignore-fail test cases) import getopt import math import os import re import subprocess import sys import time BASE_PATH = os.sep.join(os.path.abspath(__file__).split(os.sep)[:-2]) sys.path.append(os.path.join(BASE_PATH, "lib")) # Move the command line arguments away, so that SimFactory doesn't # look at them argv = sys.argv sys.argv = [] import simenv import simlib import libutil from libutil import * SimEnvironment = simenv.init(BASE_PATH, __file__, None, None) print ################################################################################ all_machines = [ "bethe", "bluewaters", "carver", "datura", "gordon", # "gpc", # not enough disk space "hopper", "lonestar", "mike", "nvidia", "orca", "pandora", "philip", "shc", "shelob", "stampede", "stampede-mic", # "supermuc", # not enough disk space # "titan", # will soon be retired "trestles", "vesta", "zwicky", ] options, argv = getopt.getopt(argv[1:], "", ["sync", "no-sync", "build", "no-build", "clean", "no-clean", "reconfig", "no-reconfig", "submit", "no-submit", "testsuite", "no-testsuite", "benchmark", "no-benchmark", "submit-benchmark", "no-submit-benchmark", "recover", "no-recover", "debug", "no-debug", "aligned", "no-aligned", "configuration=", "configuration-options=", "thornlist=", "submit-parfile=", "submit-walltime=", "testsuite-walltime=", "benchmark-walltime=", "recover-parfile=", "recover-walltime=", "sleeptime="]) for x in argv: if re.search(r"^-", x): fatal("Unrecognised option \"%s\"" % x) def optbool(options, name, default=None): for option, value in options: if option == ("--%s" % name): return True if option == ("--no-%s" % name): return False assert default!=None return default def optfloat(options, name, default=None): for option, value in options: if option == ("--%s" % name): return float(value) assert default!=None return default def optstr(options, name, default=None): for option, value in options: if option == ("--%s" % name): return value assert default!=None return default do_sync = optbool(options, "sync" , True) do_build = optbool(options, "build" , True) do_clean = optbool(options, "clean" , True) do_reconfig = optbool(options, "reconfig" , True) do_submit = optbool(options, "submit" , True) do_testsuite = optbool(options, "testsuite" , True) do_benchmark = optbool(options, "benchmark" , True) do_submit_benchmark = optbool(options, "submit-benchmark", True) do_recover = optbool(options, "recover" , True) do_debug = optbool(options, "debug" , True) do_aligned = optbool(options, "aligned" , False) configuration = optstr(options, "configuration", "") configuration_options = optstr(options, "configuration-options", "") thornlist = optstr(options, "thornlist", "manifest/einsteintoolkit.th") #configuration = optstr(options, "configuration", "empty") #thornlist = optstr(options, "thornlist", "simfactory/par/empty.th") submit_parfile = optstr(options, "submit-parfile", "submit.par") #submit_parfile = optstr(options, "submit-parfile", "empty.par") submit_walltime = optstr(options, "submit-walltime", "0:10:00") testsuite_walltime = optstr(options, "testsuite-walltime", "4:00:00") benchmark_walltime = optstr(options, "benchmark-walltime", "0:30:00") #recover_parfile = optstr(options, "recover-parfile", "par/static_tov.par") #recover_walltime = optstr(options, "recover-walltime", "1:00:00") recover_parfile = optstr(options, "recover-parfile", "recover.par") recover_walltime = optstr(options, "recover-walltime", "0:15:00") # 10.0 is not sufficient e.g. for zwicky sleep_time = optfloat(options, "sleeptime", 60.0) machines = argv if not machines: machines = all_machines logdir = "log" aligned_opt = "--replace=VECTORISE_ALIGNED_ARRAYS=yes" build_options = [] if do_clean: build_options += ["--clean"] if do_reconfig: build_options += ["--reconfig"] ################################################################################ def flatten(lists): return sum(lists, []) def quote(s): return "'" + s.replace("'", r"'\''") + "'" def create_timestamp(): return "[%s]" % time.strftime("%Y-%m-%d %H:%M:%S") def get_local_machine(): return simlib.GetMachineName() def get_remotes(machine): local_machine = get_local_machine() if local_machine == machine: return [] # ["--machine", machine] return ["--remote", machine] def get_mdb_entry(machine, key): description = simenv.ConfigurationDatabase.GetMachine(machine) assert description return description.GetKey(key) def info(msg=""): sys.stdout.flush() sys.stderr.flush() print "DISTRIBUTE:", create_timestamp() print "DISTRIBUTE:", msg sys.stdout.flush() def log_result(msg): result_file.write("%s: %s\n" % (create_timestamp(), msg)) result_file.flush() def fatal(msg): sys.stdout.flush() sys.stderr.flush() print "DISTRIBUTE:", create_timestamp() print "DISTRIBUTE:", "Error:", msg print "DISTRIBUTE:", "Aborted." sys.stdout.flush() log_result("Error: %s" % msg) log_result("Aborted.") result_file.close() sys.exit(1) def done(): sys.stdout.flush() sys.stderr.flush() print "DISTRIBUTE:", create_timestamp() print "DISTRIBUTE:", "Done." sys.stdout.flush() log_result("Done.") result_file.close() sys.exit(0) def execute(command): info("Executing: %s" % " ".join(command)) sys.stdout.flush() sys.stderr.flush() subprocess.call(command, bufsize=1, stdin=None, stdout=sys.stdout, stderr=sys.stderr) sys.stdout.flush() sys.stderr.flush() def get_output(command): info("Calling: %s" % " ".join(command)) sys.stdout.flush() sys.stderr.flush() p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Note: we can't read stdout and stderr sequentially, because this # could lead to a deadlock output = p.stdout.read() sys.stdout.flush() sys.stderr.flush() return output ################################################################################ def authenticate(machine): command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", "true"]]) execute(command) def authenticate_access(machine): info("Authenticating access to %s:" % machine) authenticate(machine) def sync(machine): local_machine = get_local_machine() if local_machine != machine: command = ["./simfactory/bin/sim", "--verbose", "sync", machine] execute(command) def list_configurations(machine): command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["list-configurations"]]) execute(command) def get_configuration_status(machine, configuration): # Examine the configuration status command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["list-configurations"]]) output = get_output(command) if re.search(r"^ +%s .*built" % configuration, output, re.MULTILINE): return 'S' if re.search(r"^ +%s .*incomplete" % configuration, output, re.MULTILINE): return 'F' return 'E' def build(machine, build_options): conf = "sim" if configuration: conf = configuration if aligned_opt in build_options: conf = "%s-aligned" % conf if "--debug" in build_options: conf = "%s-debug" % conf configurations = [conf] optionlist = get_mdb_entry(machine, "optionlist") submitscript = get_mdb_entry(machine, "submitscript") runscript = get_mdb_entry(machine, "runscript") optionlists = ["--optionlist=%s" % optionlist] submitscripts = [] if submitscript: submitscripts = ["--submitscript=%s" % submitscript] runscripts = ["--runscript=%s" % runscript] thornlists = ["--thornlist=%s" % thornlist] command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["build", conf], build_options, configuration_options.split(), optionlists, submitscripts, runscripts, thornlists]) execute(command) status = get_configuration_status(machine, conf) if status == 'S': log_result("build %s succeeded" % conf) elif status == 'F': log_result("build %s failed" % conf) else: fatal("Cannot determine status of configuration %s" % conf) def list_simulations(machine): command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["list-simulations"]]) execute(command) def get_status(machine, simulation): # Examine the simulation status command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["list-simulations", simulation]]) output = get_output(command) if re.search(r"^ +%s .*INACTIVE" % simulation, output, re.MULTILINE): return 'I' if re.search(r"^ +%s .*QUEUED" % simulation, output, re.MULTILINE): return 'Q' if re.search(r"^ +%s .*(PRESUBMITTED|RUNNING)" % simulation, output, re.MULTILINE): return 'R' if re.search(r"^ +%s .*FINISHED" % simulation, output, re.MULTILINE): return 'F' return 'E' def run_testsuite(machine, procs, build_options): conf = "sim" if configuration: conf = configuration if aligned_opt in build_options: conf = "%s-aligned" % conf if "--debug" in build_options: conf = "%s-debug" % conf sourcebasedir = simlib.GetLocalSourceBaseDir() sourcedir = simenv.CACTUS_PATH sourcesuffix = sourcedir.replace("%s/" % sourcebasedir, "") simulation = ("testsuite-%s-%s-%s-%s-procs%06d" % (machine, sourcesuffix, conf, date, procs)) status = get_status(machine, simulation) if status != 'E': fatal("Simulation %s exists before it is created" % simulation) max_threads = int(get_mdb_entry(machine, "num-threads")) ppn = int(get_mdb_entry(machine, "ppn")) max_nodes = int(get_mdb_entry(machine, "nodes")) nodes = max(1, procs / ppn) nodes = min(max_nodes, nodes) cores = nodes * ppn procs = min(cores, procs) threads = max(1, cores / procs) threads = min(max_threads, threads) ppn_used = procs * threads / nodes cores = nodes * ppn_used info(" Running %d processes with %d threads on %d cores of %d nodes" % (procs, threads, cores, nodes)) config_opts = [] if configuration: config_opts = ["--config", configuration] # Create and submit command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["create-submit", simulation, "--testsuite", "--configuration=%s" % conf, "--walltime=%s" % testsuite_walltime, "--procs=%d" % cores, "--ppn-used=%d" % ppn_used, "--num-threads=%d" % threads], config_opts]) execute(command) time.sleep(sleep_time) status = get_status(machine, simulation) if status not in ['Q', 'R', 'F']: fatal("Simulation %s is not active after it has been submitted" % simulation) return simulation def examine_testsuite(machine, simulation): command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["show-output", simulation]]) output = get_output(command) info("Output of testsuite %s:" % simulation) print output sys.stdout.flush() try: lines = output.splitlines() separator = 72*"=" begin_summary = lines.index(separator) + 1 end_summary = (lines[begin_summary:].index(" Tests passed:") + begin_summary) summary = lines[begin_summary:end_summary] info("testsuite %s finished" % simulation) log_result("testsuite %s finished" % simulation) for line in summary: if line != "": print line log_result(line) except: info("ERROR: Could not run testsuite %s" % simulation) log_result("ERROR: Could not run testsuite %s" % simulation) def examine_testsuites(machine, name, opts, sims, procs): entry = simenv.ConfigurationDatabase.GetMachine(machine) user = entry.user localsourcebasedir = simlib.GetLocalSourceBaseDir() localsourcedir = simenv.CACTUS_PATH sourcesuffix = localsourcedir.replace("%s/" % localsourcebasedir, "") conf = "sim" if configuration: conf = configuration if aligned_opt in opts: conf = "%s-aligned" % conf if "--debug" in build_options: conf = "%s-debug" % conf simulation = sims[0] sourcebasedir = simlib.GetSourceBaseDir(entry) sourcedir = "%s/%s" % (sourcebasedir, sourcesuffix) basedir = simlib.GetBaseDir1(entry) subcommand = ["simfactory/bin/test-archive.sh", machine, sourcesuffix, conf, simulation, sourcedir, basedir, sims[0], "%d" % procs[0], sims[1], "%d" % procs[1]] command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", " ".join(map(quote, subcommand))]]) execute(command) def run_benchmarks(machine, build_options): conf = "sim" if configuration: conf = configuration if aligned_opt in build_options: conf = "%s-aligned" % conf if "--debug" in build_options: conf = "%s-debug" % conf sourcebasedir = simlib.GetLocalSourceBaseDir() sourcedir = simenv.CACTUS_PATH sourcesuffix = sourcedir.replace("%s/" % sourcebasedir, "") prefix = "benchmarks-%s-%s-%s-%s" % (machine, sourcesuffix, conf, date) # Cores per node min_ppn = int(get_mdb_entry(machine, "min-ppn")) ppn = int(get_mdb_entry(machine, "ppn")) ppn_used = [pu for pu in range(1, ppn+1) if ppn % pu == 0] # Number of threads max_threads = int(get_mdb_entry(machine, "max-num-threads")) threads = [thr for thr in range(1, max_threads+1) if max_threads % thr == 0] # Number of SMT threads max_smt = int(get_mdb_entry(machine, "max-num-smt")) smts = range(1, max_smt+1) # Memory per core node_memory = 1024*1024 * float(get_mdb_entry(machine, "memory")) memory = min(0.5 * 1024**3 * ppn, 0.5 * node_memory) cycles_sec = float(get_mdb_entry(machine, "cpufreq")) * 1.0e+9 flop_cycle = float(get_mdb_entry(machine, "flop/cycle")) efficiency = float(get_mdb_entry(machine, "efficiency")) flop_per_second = flop_cycle * cycles_sec # Create parameter files subcommand = flatten([["simfactory/bin/bench-create.py", "--path='%s'" % prefix, "--prefix='%s'" % prefix, "--machine='%s'" % machine, "--configuration=%s" % conf, "--memory=%g" % memory, "--walltime=%g" % 300.0], ["--levels=%d" % lev for lev in [1]], ["--multipatch=%d" % mp for mp in [0]], ["--hydro=%d" % hyd for hyd in [0]], ["--flop-per-second=%g" % flop_per_second, "--efficiency=%g" % efficiency, "--min-cores=%d" % ppn, # 1, "--max-cores=%d" % ppn], ["--min-nodes=%d" % 1, "--max-nodes=%d" % 1], ["--threads=%d" % thr for thr in threads], ["--smt=%d" % smt for smt in smts], ["--ppn=%d" % ppn], ["--ppn-used=%d" % pu for pu in ppn_used], ["--run=%d" % run for run in [0]]]) command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", ' '.join(subcommand)]]) execute(command) # Build, create, and submit if do_submit_benchmark: # Build buildcommand = ["sh", "%s/build.sh" % prefix] command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", ' '.join(buildcommand)]]) execute(command) # Create and submit subcommand = ["sh", "%s/submit.sh" % prefix] command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", ' '.join(subcommand)]]) execute(command) # Get list of simulations subcommand = ["cat", "%s/simulations.txt" % prefix] command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", ' '.join(subcommand)]]) output = get_output(command) simulations = output.splitlines() begin_sim = simulations.index('BEGIN_SIMULATIONS') end_sim = simulations.index('END_SIMULATIONS') simulations = simulations[begin_sim+1:end_sim] else: simulations = [] print "There are %d benchmark simulations:" % len(simulations) for sim in simulations: print " %s" % sim if simulations: time.sleep(sleep_time) for sim in simulations: status = get_status(machine, sim) if status not in ['Q', 'R', 'F']: fatal("Simulation %s is not active after it has been submitted" % sim) return (prefix, simulations) def collect_benchmarks(machine, prefix, simulations): cycles_sec = float(get_mdb_entry(machine, "cpufreq")) * 1.0e+9 flop_cycle = float(get_mdb_entry(machine, "flop/cycle")) ppn = int(get_mdb_entry(machine, "ppn")) # Collect results # TODO: use the auto-generated collect.sh for this subcommand = flatten([["echo BEGIN_TIMINGS;", "for sim in $(grep -v _SIMULATIONS %s/simulations.txt); do" % prefix, " awk '/^[^#]/ { print \"'$sim'\", $26; }' ~/simulations/$sim/output-0000/*/carpet::timing..asc 2>/dev/null |", " tail -n 1;", "done;", "echo END_TIMINGS"]]) command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["execute", ' '.join(subcommand)]]) output = get_output(command) timings = output.splitlines() begin_tim = timings.index('BEGIN_TIMINGS') end_tim = timings.index('END_TIMINGS') timings = timings[begin_tim+1:end_tim] results = [] for tim in timings: res = tim.split() sim = res[0] gpu_sec = float(res[1]) m = re.search(r"nodes(\d+)", sim) nodes = int(m.group(1)) if nodes<1: raise "Could not determine number of nodes" if (gpu_sec > 0.0): # gpu_sec=0 if there is initial output, but no final output sec_gpu = nodes * ppn / gpu_sec results.append((sim, sec_gpu)) results.sort(key=lambda x: x[1]) print "There are %d benchmark results:" % len(results) print "[Units: CPU time in seconds per grid point update, lower is better," print " Flop-equivalents per grid point update, lower is better]" for res in results: sim = res[0] sec_gpu = res[1] flop_gpu = flop_cycle * cycles_sec * sec_gpu print " %s: %g sec, %g flop" % (sim, sec_gpu, flop_gpu) log_result("%s: %g sec, %g flop" % (sim, sec_gpu, flop_gpu)) if len(results) != len(simulations): print ("NOTE: %d benchmarks did not complete" % (len(simulations) - len(results))) # missing = list(set(simulations) - set(results)) missing = {} for sim in simulations: missing[sim] = 1 for res in results: sim = res[0] del missing[sim] missing = missing.keys() missing.sort() for sim in missing: print " %s" % sim log_result("%s: incomplete" % sim) def create_submit(machine, build_options, parfile, walltime): conf = "sim" if configuration: conf = configuration if aligned_opt in build_options: conf = "%s-aligned" % conf if "--debug" in build_options: conf = "%s-debug" % conf (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s-%s" % (file, machine, conf, date) status = get_status(machine, simulation) if status != 'E': fatal("Simulation %s exists before it is created" % simulation) memory = float(get_mdb_entry(machine, "memory")) num_threads = int(get_mdb_entry(machine, "num-threads")) ppn = int(get_mdb_entry(machine, "ppn")) max_nodes = int(get_mdb_entry(machine, "nodes")) want_memory = 4.0 # minimum, in GByte want_nodes = 2 # minimum nodes = max(min(want_nodes, max_nodes), int(math.ceil(want_memory / memory))) procs = nodes * ppn info(" Running %d processes with %d threads on %d cores of %d nodes" % (procs / num_threads, num_threads, procs, nodes)) # Create and submit command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["create-submit", simulation, "--configuration=%s" % conf, "--parfile=%s" % parfile, "--walltime=%s" % walltime, "--procs=%d" % procs, "--num-threads=%d" % num_threads]]) execute(command) time.sleep(sleep_time) status = get_status(machine, simulation) if status not in ['Q', 'R', 'F']: fatal("Simulation %s is not active after it has been submitted" % simulation) return simulation def submit(machine, simulation): status = get_status(machine, simulation) if status not in ['I', 'F']: fatal("Simulation %s is not inactive or finished before it is submitted" % simulation) # Submit command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["submit", simulation]]) execute(command) time.sleep(sleep_time) status = get_status(machine, simulation) if status not in ['Q', 'R', 'F']: fatal("Simulation %s is not active after it has been submitted" % simulation) def presubmit(machine, simulation): status = get_status(machine, simulation) if status not in ['Q', 'R', 'F']: fatal("Simulation %s is not active before it is presubmitted" % simulation) # Submit command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["submit", simulation]]) execute(command) time.sleep(sleep_time) status = get_status(machine, simulation) if status not in ['Q', 'R', 'F']: fatal("Simulation %s is not active after it has been submitted" % simulation) def show_output(machine, simulation): command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["show-output", simulation]]) execute(command) def check_output(machine, simulation): command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["show-output", simulation]]) output = get_output(command) if re.search(r"^Done\.$", output, re.MULTILINE): return 'S' return 'E' def wait(machine, simulation): my_sleep_time = sleep_time max_sleep_time = 60.0 # seconds while True: # Examine the simulation status info("Examining simulation status...") status = get_status(machine, simulation) if status not in ['I', 'Q', 'R', 'F']: fatal("Simulation %s is neither inactive nor active" % simulation) if status in ['I', 'F']: info("Simulation %s is not active any more." % simulation) sys.stdout.flush() break # Wait info("Waiting for %g seconds..." % my_sleep_time) sys.stdout.flush() sys.stderr.flush() time.sleep(my_sleep_time) my_sleep_time = min(max_sleep_time, my_sleep_time*2) # Wait a bit for things to settle down info("Waiting for another %g seconds..." % sleep_time) time.sleep(sleep_time) # Examine the simulation output info("Examining simulation output...") status = check_output(machine, simulation) if status == 'S': info("Simulation %s finished successfully." % simulation) else: info("Simulation %s aborted." % simulation) sys.stdout.flush() def wait_all(machine, simulations): my_sleep_time = sleep_time max_sleep_time = 60.0 # seconds # to_check = set(simulations) to_check = {} for sim in simulations: to_check[sim] = 1 while to_check: # Examine the simulation status info("Examining simulation status...") # done = set() done = {} for sim in to_check: status = get_status(machine, sim) if status not in ['I', 'Q', 'R', 'F']: fatal("Simulation %s is neither inactive nor active" % sim) if status in ['I', 'F']: info("Simulation %s is not active any more." % sim) sys.stdout.flush() # done |= {sim} done[sim] = 1 # to_check -= done for sim in done: del to_check[sim] # Wait info("Waiting for %g seconds..." % my_sleep_time) sys.stdout.flush() sys.stderr.flush() time.sleep(my_sleep_time) my_sleep_time = min(max_sleep_time, my_sleep_time*2) # Wait a bit for things to settle down if simulations: info("Waiting for another %g seconds..." % sleep_time) time.sleep(sleep_time) # Examine the simulation output info("Examining simulation output...") for sim in simulations: status = check_output(machine, sim) if status == 'S': info("Simulation %s finished successfully." % sim) else: info("Simulation %s aborted." % sim) sys.stdout.flush() def cleanup(machine, simulation): # Ensure simulation is active and finished (i.e. ready to be # cleaned up) status = get_status(machine, simulation) if status not in ['I', 'F']: fatal("Simulation %s is not inactive or finished before cleanup" % simulation) # Clean up command = flatten([["./simfactory/bin/sim", "--verbose"], get_remotes(machine), ["cleanup", simulation]]) execute(command) # Ensure simulation is inactive status = get_status(machine, simulation) if status != 'I': fatal("Simulation %s is not inactive after cleanup" % simulation) def cleanup_all(machine, simulations): for sim in simulations: cleanup(machine, sim) def sync_build_submit(machine): info("Distributing to %s..." % machine) if do_sync: info("Synchronising...") sync(machine) # Give the file system some time to settle down time.sleep(3) info info("Current configurations:") list_configurations(machine) if do_build: confs = [] if do_debug: confs += [("debug", ["--debug"])] confs += [("optimised", [])] if do_aligned: confs += [("aligned", [aligned_opt])] for (name,opts) in confs: info("Building %s configuration..." % name) build(machine, build_options + opts) info("Current configurations:") list_configurations(machine) info info("Current simulations:") list_simulations(machine) if do_submit: confs = [("optimised", [])] if do_aligned: confs += [("aligned", [aligned_opt])] for (name,opts) in confs: info("Submitting %s test job..." % name) sim = create_submit( machine, build_options + opts, submit_parfile, submit_walltime) info("Waiting for %s test job..." % name) wait(machine, sim) info("Cleaning up %s test job..." % name) cleanup(machine, sim) info("Output of %s test job:" % name) show_output(machine, sim) status = check_output(machine, sim) if status == 'S': log_result("simulation %s succeeded" % sim) elif status == 'E': log_result("simulation %s failed" % sim) else: fatal("Cannot determine result of simulation %s" % sim) info("Current simulations:") list_simulations(machine) info if do_testsuite: confs = [("optimised", [])] procs = [1, 2] if do_aligned: confs += [("aligned", [aligned_opt])] for (name,opts) in confs: info("Running %s testsuite..." % name) sims = [run_testsuite(machine, proc, build_options + opts) for proc in procs] info("Waiting for testsuite %s..." % name) wait_all(machine, sims) info("Cleaning up testsuite %s..." % name) cleanup_all(machine, sims) info("Examining output of testsuite %s:" % name) for sim in sims: examine_testsuite(machine, sim) examine_testsuites(machine, name, opts, sims, procs) info("Current simulations:") list_simulations(machine) info if do_benchmark: confs = [("optimised", [])] if do_aligned: confs += [("aligned", [aligned_opt])] for (name,opts) in confs: info("Running %s benchmarks..." % name) (prefix, sims) = run_benchmarks(machine, build_options + opts) info("Waiting for %s benchmarks..." % name) wait_all(machine, sims) info("Cleaning up %s benchmarks..." % name) cleanup_all(machine, sims) info("Results of %s benchmark:" % name) collect_benchmarks(machine, prefix, sims) info("Current simulations:") list_simulations(machine) info if do_recover: confs = [("optimised", [])] if do_aligned: confs += [("aligned", [aligned_opt])] for (name,opts) in confs: info("Submitting %s recovery job..." % name) sim = create_submit( machine, build_options + opts, recover_parfile, recover_walltime) info("Waiting for %s recovery job..." % name) wait(machine, sim) info("Cleaning up %s recovery job..." % name) cleanup(machine, sim) info("Submitting a second %s recovery job..." % name) submit(machine, sim) info("Presubmitting third %s recovery job..." % name) presubmit(machine, sim) info("Waiting for second and third %s recovery jobs..." % name) wait(machine, sim) info("Waiting for %g seconds..." % sleep_time) time.sleep(sleep_time) info("Waiting for second and third %s recovery jobs again..." % name) wait(machine, sim) info("Cleaning up second and third %s recovery jobs..." % name) cleanup(machine, sim) info("Output of %s simulation:" % name) show_output(machine, sim) info("Current simulations:") list_simulations(machine) info ################################################################################ print "Distributing the Cactus source tree" # # Check local system consistency # if not os.path.exists(".svn"): # print # print "This program has either been called from the wrong directory" # print "or on the wrong system. Aborting." # sys.exit(1) # Set up the tasks print print " %d target machines: %s" % (len(machines), " ".join(machines)) date = time.strftime("%Y.%m.%d-%H.%M.%S") if do_sync: print " - synchronising (if not local)" else: print " - not synchronising" if do_aligned: print " - enforcing aligned arrays" else: print " - not enforcing aligned arrays" if do_build: print " - building with options %s" % " ".join(build_options) else: print " - not building" if do_submit: print (" - submitting simple job %s with walltime %s" % (submit_parfile, submit_walltime)) else: print " - not submitting simple job" if do_testsuite: print " - running testsuite with walltime %s" % testsuite_walltime else: print " - not running testsuite" if do_benchmark: print " - running benchmark with walltime %s" % benchmark_walltime else: print " - not running benchmark" if do_recover: print (" - submitting recovery job %s with walltime %s" % (recover_parfile, recover_walltime)) else: print " - not submitting recovery job" sys.stdout.flush() # Authenticate access #print #for machine in machines: # authenticate_access(machine) # Perform the tasks # # Create a new process and exit the current process, so that the new # # process is detached from the console # pid = os.fork() # if pid>0: # sys.exit(0) # # Wait for the parent to exit # time.sleep(3) print machine_sleep_time = 0 # seconds next_machine_sleep_time = 2 # seconds for machine in machines: time.sleep(machine_sleep_time) machine_sleep_time = next_machine_sleep_time print "Scheduling distribution to %s..." % machine, sys.stdout.flush() # Create a subprocess for the task pid = os.fork() if pid==0: # The following follows the recipe at # : # Detach from controlling terminal to make the child a session # leader os.setsid() # Create a new process and exit the current process, so that # the new process is a session leader (i.e. detached from the # console) pid = os.fork() if pid>0: os._exit(0) # Close all open files (in particular stdin, which we don't # want to steal from out grandparent) os.close(0) os.close(1) os.close(2) # Redirect stdin to /dev/null sys.stdin = open("/dev/null", "r") # Redirect stdout and stderr to a log file try: os.mkdir(logdir) except: pass log_file_name = os.path.join(logdir, "%s.out" % machine) try: os.remove(log_file_name) except: pass sys.stdout = open(log_file_name, "w") sys.stderr = sys.stdout result_file_name = os.path.join(logdir, "%s.results" % machine) result_file = open(result_file_name, "a") # Unset the DISPLAY environment variable to avoid X11 windows # asking for ssh credentials try: del os.environ["DISPLAY"] # del os.environ["SSH_ASKPASS"] # del os.environ["SSH_TTY"] except: pass # Output PID print "machine: %s" % machine print "pid: %d" % os.getpid() print log_result("=" * 40) log_result("Machine: %s" % machine) # Execute the task sync_build_submit(machine) # Exit the subprocess done() print " (pid %d)" % pid print print "Done."