#!/usr/bin/python # Test Cactus and SimFactory on a set of machines # This script "distribute" and its companion "distribute-watch" allow # easy testing of SimFactory and its MDB entries on many systems. In # the simplest case, you run "simfactory/bin/distribute", and this # tests on the default set of machines. In particular, it: # 1. syncs the current source tree to the machine (if not local), # 2. configures and builds from scratch both a debug and a # production configuration, # 3. submits a test job, and # 4. waits until the test job has finished. # # The options --no-sync, --no-build, --no-clean, --no-reconfig, and # --no-submit fine-tune the behaviour in the obvious way. The options # --configuration=..., --thornlist=..., --parfile=..., and # --walltime=... are also available. # # Non-option arguments specify the set of machines on which the test # should be performed. For example, I could use # ./simfactory/bin/distribute --no-clean redshift # to build and submit quickly on my local laptop. # # [The default list of machines should really be configurable in a # better way.] # # Since this script is designed to test multiple systems # simultaneously, it executes in the background and redirects output # for each system into a log file "log/MACHINE.out". Since it runs in # the background, it cannot ask for passwords to log into remote # systems. For Athena and Kraken, where one receives a token valid for # twelve hours, I usually log in once just before calling "distribute" # to obtain the token ahead of time. # # # # The companion script "distribute-watch" watches these log files: # ./simfactory/bin/distribute-watch # This displays the status of the distribution on one line per system, # and updates the display every minute. (It only looks at the log # files and does not access the remote systems directly.) # # The output is sorted by time; running tests appear near the top, and # finished tests appear near the bottom. "age" indicates the time at # which the log file was last written to (to see when a test was last # run, or whether a test has become stuck). "dur'n" is the duration # for which the test was running (to judge the speed of a system). # "size" is the size of the log file (also indicating whether a test # has become stuck). For example, the final link stage on Abe and # Lincoln can take up to an hour. # # The state "working..." for a system indicates that the test is # running, and "[done]" that the test has finished. The outputs # "[sim-debug]", "[sim]", and "[success]" indicate that building the # debug configuration, building the production configuration, and # running the test simulations succeeded. If these indicators are not # present, then the corresponding tests failed. # # I use my own thorn list for my tests, which includes all of the # Einstein Toolkit thorns, many non-public production thorns that my # collaborators and I use, as well as a set of infrastructure and # development thorns. The currently fastest machine for this thorn # list is Bethe, a workstation at Caltech, which completes the test in # 23 minutes. Lonestar, the new HPC cluster at TACC, comes in second # with 34 minutes. (The standard Einstein Toolkit thorn list should be # significantly faster than this.) # # # # [Erik says:] Given that the set of systems to which we have access # is so diverse (local workstations, local clusters, LONI, LSU, NERSC, # TerGrid, ...), there are always some systems which are not available # or not functional. Furthermore, there are a host of transient # reasons why a test may fail on a particular system. If a test fails, # I usually just rerun this test without much analysis (and possibly # with the --no-clean option, so that the previous build is continued # where it broke); only if a test fails twice, I begin to analyse the # problem in detail. import getopt import math import os import re import time import subprocess import sys BASE_PATH = os.sep.join(os.path.abspath(__file__).split(os.sep)[:-2]) sys.path.append(os.path.join(BASE_PATH, "lib")) # Move the command line arguments away, so that SimFactory doesn't # look at them argv = sys.argv sys.argv = [] import simenv import simlib import libutil from libutil import * SimEnvironment = simenv.init(BASE_PATH, __file__, None, None) print ################################################################################ all_machines = [ "abe", "athena", # needs grid certificate "bd", "bethe", #LONI# "bluedawg", "bp", "carver", "croton", "curry", "damiana", #LONI# "ducky", "eric", "fermi", "franklin", "hlrb2", "hopper2", "kraken", # needs grid certificate #LONI# "lacumba", "lincoln", "lonestar", "longhorn", #LONI# "louie", #"mileva", # needs grid certificate #LONI# "neptune", "numrel02", "numrel05", "numrel06", #"numrel07", # makes the machine swap "numrel08", "numrel09", "numrel10", #LONI# "oliver", "orca", #LONI# "painter", #"pelican", # not available "philip", #"pople", # not set up yet #LONI# "poseidon", #LONI# "queenbee", "ranger", "requin", "s-kraken", "saw", #"steele", # not enough disk space "surveyor", #"tezpur", # can't rm -rf "vip", #LONI# "zeke", ] options, argv = getopt.getopt (argv[1:], "", ["sync", "no-sync", "build", "no-build", "clean", "no-clean", "reconfig", "no-reconfig", "submit", "no-submit", "debug", "no-debug", "configuration=", "thornlist=", "parfile=", "walltime=", "sleeptime="]) for x in argv: if re.search(r"^-", x): fatal("Unrecognised option \"%s\"" % x) def optbool (options, name, default=None): for option, value in options: if option == ("--%s" % name): return True if option == ("--no-%s" % name): return False assert default!=None return default def optstr (options, name, default=None): for option, value in options: if option == ("--%s" % name): return value assert default!=None return default do_sync = optbool (options, "sync" , True) do_build = optbool (options, "build" , True) do_clean = optbool (options, "clean" , True) do_reconfig = optbool (options, "reconfig", True) do_submit = optbool (options, "submit" , True) do_debug = optbool (options, "debug" , True) configuration = optstr (options, "configuration", "") thornlist = optstr (options, "thornlist", "manifest/einsteintoolkit.th") #configuration = optstr (options, "configuration", "empty") #thornlist = optstr (options, "thornlist", "simfactory/par/empty.th") #parfile = optstr (options, "parfile", "par/static_tov.par") #walltime = optstr (options, "walltime", "1:00:00") parfile = optstr (options, "parfile", "recover.par") walltime = optstr (options, "walltime", "0:15:00") sleep_time = int(optstr (options, "sleeptime", "60")) #parfile = optstr (options, "parfile", "empty.par") #walltime = optstr (options, "walltime", "0:10:00") machines = argv if machines==[]: machines = all_machines logdir = "log" ################################################################################ def flatten(lists): return sum(lists, []) def create_timestamp(): return "[%s]" % time.strftime("%Y-%m-%d %H:%M:%S") def get_local_machine(): return simlib.GetMachineName() def get_remotes(machine): local_machine = get_local_machine() if local_machine != machine: return ["--remote", machine] else: return [] def get_mdb_entry(machine, key): description = simenv.ConfigurationDatabase.GetMachine(machine) assert description return description.GetKey(key) def info(msg=""): sys.stdout.flush() sys.stderr.flush() print "DISTRIBUTE:", create_timestamp() print "DISTRIBUTE:", msg sys.stdout.flush() def fatal(msg): sys.stdout.flush() sys.stderr.flush() print "DISTRIBUTE:", create_timestamp() print "DISTRIBUTE:", "Error:", msg print "DISTRIBUTE:", "Aborted." sys.stdout.flush() sys.exit(1) def done(): sys.stdout.flush() sys.stderr.flush() print "DISTRIBUTE:", create_timestamp() print "DISTRIBUTE:", "Done." sys.stdout.flush() sys.exit(0) def execute(command): info ("Executing: %s" % " ".join(command)) sys.stdout.flush() sys.stderr.flush() subprocess.call(command, bufsize=1, stdin=None, stdout=sys.stdout, stderr=sys.stderr) sys.stdout.flush() sys.stderr.flush() def get_output(command): info("Calling: %s" % " ".join(command)) sys.stdout.flush() sys.stderr.flush() p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Note: we can't read stdout and stderr sequentially, because this # could lead to a deadlock output = p.stdout.read() sys.stdout.flush() sys.stderr.flush() return output ################################################################################ def authenticate(machine): command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["execute", "true"]]) execute(command) def authenticate_access(machine): info("Authenticating access to %s:" % machine) authenticate(machine) def sync(machine): local_machine = get_local_machine() if local_machine != machine: command = ["./simfactory/bin/sim", "sync", machine] execute(command) def list_configurations(machine): command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["list-configurations"]]) execute(command) def build(machine, options): if configuration: if "--debug" in options: configurations = ["%s-debug" % configuration] else: configurations = [configuration] else: configurations = [] optionlist = get_mdb_entry(machine, "optionlist") submitscript = get_mdb_entry(machine, "submitscript") runscript = get_mdb_entry(machine, "runscript") optionlists = ["--optionlist=%s" % optionlist] if submitscript: submitscripts = ["--submitscript=%s" % submitscript] else: submitscripts = [] runscripts = ["--runscript=%s" % runscript] thornlists = ["--thornlist=%s" % thornlist] command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["build"], configurations, options, optionlists, submitscripts, runscripts, thornlists]) execute(command) def list_simulations(machine): command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["list-simulations"]]) execute(command) def get_status(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) # Examine the simulation status command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["list-simulations", simulation]]) output = get_output(command) if re.search(r"^ +%s .*INACTIVE" % simulation, output, re.MULTILINE): return 'I' if re.search(r"^ +%s .*QUEUED" % simulation, output, re.MULTILINE): return 'Q' if re.search(r"^ +%s .*RUNNING" % simulation, output, re.MULTILINE): return 'R' if re.search(r"^ +%s .*FINISHED" % simulation, output, re.MULTILINE): return 'F' return 'E' def create_submit(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) status = get_status(machine) if status != 'E': fatal("Simulation exist before it is created") memory = float(get_mdb_entry(machine, "memory")) num_threads = int(get_mdb_entry(machine, "num-threads")) ppn = int(get_mdb_entry(machine, "ppn")) max_nodes = int(get_mdb_entry(machine, "nodes")) want_memory = 4.0 # minimum, in GByte want_nodes = 2 # minimum nodes = max(min(want_nodes, max_nodes), int(math.ceil(want_memory / memory))) procs = nodes * ppn # In Python 2.5, this could be inlined with # # ["--config", configuration] if configuration else [] # # but we need to support Python 2.3 where this construct does not # exist. if configuration: config_opts = ["--config", configuration] else: config_opts = [] # Create and submit command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["create-submit", simulation, "--parfile=%s" % parfile, "--walltime=%s" % walltime, "--procs=%s" % procs, "--num-threads=%s" % num_threads], config_opts]) execute(command) status = get_status(machine) if status not in ['Q', 'R', 'F']: fatal ("Simulation is not active after it has been submitted") def submit(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) status = get_status(machine) if status not in ['I', 'F']: fatal("Simulation is not inactive or finished before it is submitted") # Submit command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["submit", simulation]]) execute(command) status = get_status(machine) if status not in ['Q', 'R', 'F']: fatal("Simulation is not active after it has been submitted") def presubmit(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) status = get_status(machine) if status not in ['Q', 'R', 'F']: fatal("Simulation is not active before it is presubmitted") # Submit command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["submit", simulation]]) execute(command) status = get_status(machine) if status not in ['Q', 'R', 'F']: fatal("Simulation is not active after it has been submitted") def show_output(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["show-output", simulation]]) execute(command) def check_output(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["show-output", simulation]]) output = get_output(command) if re.search(r"^Done\.$", output, re.MULTILINE): return 'S' return 'E' def wait(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) my_sleep_time = sleep_time max_sleep_time = my_sleep_time # seconds while True: # Examine the simulation status info("Examining simulation status...") status = get_status(machine) if status not in ['I', 'Q', 'R', 'F']: fatal("Simulation is neither inactive nor active") if status in ['I', 'F']: info("Simulation is not active any more.") sys.stdout.flush() break # Wait info("Waiting for %d seconds..." % my_sleep_time) sys.stdout.flush() sys.stderr.flush() time.sleep(my_sleep_time) my_sleep_time = min(max_sleep_time, my_sleep_time*2) # Wait a bit for things to settle down info("Waiting for another %d seconds..." % my_sleep_time) time.sleep(my_sleep_time) # Examine the simulation output info("Examining simulation output...") status = check_output(machine) if status == 'S': info("Simulation finished successfully.") else: info("Simulation aborted.") sys.stdout.flush() def cleanup(machine): (path, file) = os.path.split(parfile) file = re.sub(r"\.[^.]*$", "", file) # remove suffix simulation = "%s-%s-%s" % (file, machine, date) # Ensure simulation is active and finished (i.e. ready to be # cleaned up) status = get_status (machine) if status not in ['I', 'F']: fatal("Simulation is not inactive or finished before cleanup") # Clean up command = flatten([["./simfactory/bin/sim"], get_remotes(machine), ["cleanup", simulation]]) execute(command) # Ensure simulation is inactive status = get_status (machine) if status != 'I': fatal("Simulation is not inactive after cleanup") def sync_build_submit(machine): info("Distributing to %s..." % machine) if do_sync: info("Synchronising...") sync(machine) # Give the file system some time to settle down time.sleep(3) info info("Current configurations:") list_configurations(machine) if do_build: if do_debug: info("Building debug configuration...") build(machine, build_options + ["--debug"]) info("Building optimised configuration...") build(machine, build_options) info("Current configurations:") list_configurations(machine) info("Current simulations:") list_simulations(machine) if do_submit: info("Submitting test job...") create_submit(machine) info("Waiting for test job...") wait(machine) info("Cleaning up test job...") cleanup(machine) info("Submitting a second test job...") submit(machine) info("Presubmitting second test job...") presubmit(machine) info("Waiting for second test job...") wait(machine) info("Waiting for %d seconds..." % sleep_time) time.sleep(sleep_time) info("Waiting for second test job again...") wait(machine) info("Cleaning up second test job...") cleanup(machine) info("Current simulations:") list_simulations(machine) info("Simulation output:") show_output(machine) ################################################################################ print "Distributing the Cactus source tree" # # Check local system consistency # if not os.path.exists(".svn"): # print # print "This program has either been called from the wrong directory" # print "or on the wrong system. Aborting." # sys.exit(1) # Set up the tasks print print " %s target machines: %s" % (len(machines), " ".join(machines)) date = time.strftime("%Y.%m.%d-%H.%M.%S") if do_sync: print " - synchronising (if not local)" else: print " - not synchronising" build_options = [] if do_build: if do_clean: build_options += ["--clean"] if do_reconfig: build_options += ["--reconfig"] print " - building with options %s" % " ".join(build_options) else: print " - not building" if do_submit: print " - submitting parfile %s with walltime %s" % (parfile, walltime) else: print " - not submitting" sys.stdout.flush() # Authenticate access #print #for machine in machines: # authenticate_access(machine) # Perform the tasks # # Create a new process and exit the current process, so that the new # # process is detached from the console # pid = os.fork() # if pid>0: # sys.exit(0) # # Wait for the parent to exit # time.sleep(3) print machine_sleep_time = 0 # seconds next_machine_sleep_time = 2 # seconds for machine in machines: time.sleep(machine_sleep_time) machine_sleep_time = next_machine_sleep_time print "Scheduling distribution to %s..." % machine, sys.stdout.flush() # Create a subprocess for the task pid = os.fork() if pid==0: # # Create a new process and exit the current process, so that # # the new process is detached from the console # pid = os.fork() # if pid>0: # sys.exit(0) # # Wait for the parent to exit # time.sleep(3) # # Close stdin, so that ssh won't ask for passwords # sys.stdin.close() # # Redirect stdin, so that ssh won't ask for passwords # sys.stdin = open("/dev/null", "r") # Redirect stdout and stderr to a log file try: os.mkdir(logdir) except: pass sys.stdout = open(os.path.join(logdir, "%s.out" % machine), "w") sys.stderr = sys.stdout # Unset the DISPLAY environment variable to avoid X11 windows # asking for ssh credentials try: del os.environ["DISPLAY"] # del os.environ["SSH_ASKPASS"] # del os.environ["SSH_TTY"] except: pass # Execute the task sync_build_submit(machine) # Exit the subprocess done() print " (pid %s)" % pid print print "Done."