Quickstart ========== build/install master_worker library # export SAGA_LOCATION=[/path/to/saga-cpp/install/] # make # make install -- installs libsaga_pm_master_worker.so into $SAGA_LOCATION/lib. installing the master/worker example codes # cd examples # make # make install (requires permissions to install to SAGA_LOCATION) run a local example # vim util.hpp edit the base advert URL # make install # cd examples/hello_world # vim mw_helloworld_master.cpp edit the workerdescription resource manager URL to point to fork://localhost/ # make install # ./mw_helloworld_master merzky@thinkie:~/saga/applications/master_worker/example/hello_world$ ./mw_helloworld_master hello world hello thinkie NOTE: you will need a remotely accessable advert service to run distributed worker! The worker instances can run on local and/or remote hosts. In the remote case, will also need to deploy both the library and the worker code on the remote machine. Take care to set up your environment so that the code is actually runnable. Easiest is to ensure saga-run.sh is in your PATH (it lives in $SAGA_LOCATION/bin), and then to start the worker with saga-run.sh mw_worker The RSH example is similarly simple as the hello_world: merzky@thinkie:~/saga/applications/master_worker/example/rsh$ ./mw_rsh_master worker rm : fork://localhost command : hostname thinkie command : quit Master/Worker ============= This library provides a very well known programming pattern: Master/Worker. A master component is spawning a number of workers, is distributing items of work to them, and is collecting the results, as needed. Depending on the problem at hand, the master can - vary the size of the worker pool - vary the distribution of the worker pool - vary the size and nature of distributed tasks Implementation Details ---------------------- In the scope of this implementation, we use the following terms: master: central coordinating component which - controls worker distribution and life time - assigns tasks to workers worker: distributed stateful component, which - recieves tasks from the master, - works on that tasks, - and returns results. task: unit of work to be distributed to the worker The different components obviously need some means to communicate, to exchange state information, tasks and results. This implementation uses SAGA's advert service to provide that communication in a centralized and persistent way. The master will create a directory for each application instance. Each worker will, once spawned, register itself in that directory, by creating a worker advert. That worker advert will be used by both master and worker to echange state, and to communicate task assignments and results. A worker advert will look like this (for an idle worker): advert://SAGA:******@advert.cct.lsu.edu:8080/home/merzky/master_worker/merzky/1 par_out : par_in : state : Idle error : task : id : 1 Looking at the advert will give you a rough idea of the worker state, error conditions etc. Also, you can actually manually communicate with the worker. The following will run /bin/date on a mw_rsh_worker client: # saga-advert set_attribute advert://SAGA:******@advert.cct.lsu.edu:8080/home/merzky/master_worker/merzky/1 \ set_attribute par_in /bin/date # saga-advert set_attribute advert://SAGA:******@advert.cct.lsu.edu:8080/home/merzky/master_worker/merzky/1 \ set_attribute state Assigned When the worker picks those new settings up, it wil run /bin/date, and change the advert to advert://SAGA:******@advert.cct.lsu.edu:8080/home/merzky/master_worker/merzky/1 par_out : /bin/date par_in : Fri Dec 2 12:59:02 CET 2011 state : Done error : task : rsh id : 1 NOTE: the master's base advert URL can be specified via an environment variable (SAGA_MW_ADVERT_URL). If the master finds that environment, it will be used and communicated to the workers. If SAGA_MW_ADVERT_URL is unset, the master will fall back to an internally pre-defind URL (see utils.hpp). Master/Worker interface ----------------------- This implementation provides two public classes, in the namespace saga_pm::master_worker (saga_pm stands for SAGA Programming Models) Class Worker ............ In order to implement your own workload, you have to derive from the worker class. Note that the constructor of the worker needs the URL of the master's advert directory, in order to register there, and to bootstrap communication. Also in the constructor, you can register whatever task routines you want to provide, via register_task(), which accepts std::string : name of task void * : this pointer of your class instance void * : pointer of the tasks member function In order to obtain the void* function pointer, you can use the saga_pm::master_worker::to_voidstar() util function (please don't look at its code, just be happy that it exists). WARNING: incorrectly passed pointers will not be catchable, and will cause segfaults! The task's workload goes into the mentioned task member routine. That routine MUST have the signature: argvec_t call_hello (argvec_t args); where saga_pm::master_worker::argvec_t is really just a std::vector . The args parameter contains any data the master gave to specify the actual unit of work, the return argvec should contain results. NOTE: binary and/or typed parameters are not yet supported - use serialization if that is needed. ------------------------------------------------------------------------ ////////////////////////// // custom worker example class my_worker : public saga_pm::master_worker::worker { public: // c'tor registers 'hello' my_worker (saga::url u) : saga_pm::master_worker::worker (u) { register_task ("hello", this, saga_pm::master_worker::to_voidstar (0, &my_worker::call_hello)); } // call_hello is our workload saga_pm::master_worker::argvec_t call_hello (saga_pm::master_worker::argvec_t args) { saga_pm::master_worker::argvec_t ret; // we expect a greeting! if ( ! args.size () || args[0] != "hello" ) throw "what?"; char host[256]; ::gethostname (host, 256); // ignore errors ret.push_back (host); return ret; } }; ------------------------------------------------------------------------ The example above shows error reporting, too: the throw will cause the worker to go into Failed state, and the 'error' attribute will be set to 'what?'. The master can reset the worker to Idle state. Class Master ............ The master class provides the following set of calls: // worker and task management id_t worker_start (worker_description & d); Start a new worker instance, according to work description. Returns id (int) std::vector worker_list (void); return list of registered worker ids void worker_stop (id_t id); kill a worker, and remove it's advert void worker_run (id_t id, std::string task, argvec_t args = noargs_); run a 'task' on worker 'id' with arguments 'args'. Duh! void shutdown (void); call stop() for all worker, remove own advert dir // inspection methods state worker_get_state (id_t id); std::string worker_get_error (id_t id); std::string worker_get_task (id_t id); argvec_t worker_get_args (id_t id); argvec_t worker_get_results (id_t id); void worker_wait (id_t id); void worker_reset (id_t id); void worker_dump (id_t id); wait() will block until the worker enters Done, Failed or Quit state. reset() will remove error info, and cleans the worker advert. dump() will dump the worker advert An example application would use the master class like shown below to use the hello_world worker from above for 5 workers ------------------------------------------------------------------------ ///////////////////////////// // example master application { // init master saga_mw::master m; // add 5 worker to the master's worker pool std::vector ids; // first, create job descriptio patr of worker description saga_mw::worker_description wd; wd.jd.set_attribute (saga::job::attributes::description_executable, "saga-run.sh"); std::vector args; args.push_back ("mw_worker"); wd.jd.set_vector_attribute (saga::job::attributes::description_arguments, args); // then run workers for 5 different backends (resource managers, rm) wd.rm = "fork://localhost/"; ids.push_back (m.worker_start (wd)); wd.rm = "ssh://india.futuregrid.org/"; ids.push_back (m.worker_start (wd)); wd.rm = "ssh://sierra.futuregrid.org/"; ids.push_back (m.worker_start (wd)); wd.rm = "ssh://alamo.futuregrid.org/"; ids.push_back (m.worker_start (wd)); wd.rm = "ssh://amerzky@cyder.cct.lsu.edu/"; ids.push_back (m.worker_start (wd)); // dump one for fun to stdout m.worker_dump (0); // run the hello task on each worker, not waiting for the to finish for ( unsigned int i = 0; i < ids.size (); i++ ) m.worker_run (ids[i], "hello"); // dump one for fun to stdout, again m.worker_dump (0); // *now* we wait for all of them to finish, and print results for ( unsigned int i = 0; i < ids.size (); i++ ) { m.worker_wait (ids[i]); saga_mw::argvec_t res = m.worker_get_results (ids[i]); std::cout << "hello (" << ids[i] << " : "; for ( unsigned int j = 0; j < res.size (); j++ ) std::cout << res[j] << " "; std::cout << std::endl; // clean up worker state m.worker_reset (ids[i]); } // tell all workers to quite, but don't wait for them to do so for ( unsigned int i = 0; i < ids.size (); i++ ) m.worker_run (ids[i], "quit"); } ------------------------------------------------------------------------ Additional Notes: ----------------- Both master and worker log some information, via the SAGA log system. The log level is CRITICAL, make sure to have your saga log settings in place in order to see the messages. The code is new, and WILL contain errors. Also, some API calls have slightly different sigs than listed above. Please report errors (and provide pathces!) on saga-users@cct.lsu.edu