Abstract: --------- Digedag is an expertimental implementation of a SAGA based workflow package. It consists of four basic components: - API for programatically expressing workflows - parser for (abstract or concrete) workflow descriptions - workflow enactor (aka SAGA engine) - in-time workflow scheduler Introduction: ------------- Workflows are often not expressed programatically, at least not explicitely. Instead, end users use graphical user interfaces to define graphs of depending operations, which, if executed in order, comprise the workflow. In most cases, the transition from the GUI specification of the workflow to be executed, to the enacted workflow, includes a number of intermediate steps which translate the specified 'abstract'(*) workflow into a 'concrete'(*) workflow, which specifically includes information about resources to be utilized etc. Both abstract and concrete workflows are stored and/or communicated in data files of one format or the other (a wide variety of workflow description languages exist). Once the workflow is specified and concretized, it can be 'enacted', i.e. the individual tasks are submitted to the assigned concrete physical resources, and their execution and communication is orchestrated according to the specified workflow logic. Digedag has a slightly different approach on the end-user side of the scheme: instead of specifying a workflow in a GUI, digedag provides an API to specify tasks, task dependencies, and inter-task communication - i.e. to specify workflows programatically. (Of course, One could create a GUI interface which *uses* digedag to translate the visual representation created by the end user into abstract or concrete workflows.) Further, digedag includes a workflow scheduler, i.e. an entity which translates the thus specified abstract workflow into an internal concrete representation. And finally, digedag includes a SAGA based workflow enactor, i.e. an entity which executes and orchestrates the workflow's execution in a distributed environment. In order to allow for integration of digedag with exising workflow environments, it also includes parsers which use the digedag workflow API to translate workflow transcriptions from several workflow transcription languages into digedag representations(**). Digedags Workflow API --------------------- Digedag allows to programatically express workflows. Basically, that statement translates well into: Digedag provides an API to specify Acyclic Directed Graphs (DAG, ***). The DAGs specified via that API consist of nodes and edges. A node can have multiple incoming and outgoing edges. An edge has exactly one source node, and one target node. Loosely spoken, a node repesents a job to be executed, and a edge represents a data item, created by one job, and consumed by another one. The digedag API allows to specify DAGs as follows: --------------------------------------------------------------- // create a new and empty DAG instance boost::shared_ptr d (new digedag::dag); // create two nodes for our dag. A node description is // basically a saga::job::description, and can specify the // application to run when the node fires // The dag instance from above functions as node factory digedag::node_description nd; boost::shared_ptr n1 = d->create_node (nd); boost::shared_ptr n2 = d->create_node (nd); // add two nodes to the dag, with identifiers d->add_node ("node_1", n1); d->add_node ("node_2", n2); // specify data to be transfered between the two nodes. // Again, the dag instance serves as an edge factory. boost::shared_ptr e1 = dag->create_edge ("/tmp/src", "/tmp/tgt"); // add that edge to the DAG d->add_edge (e1, n1, n2); // the abstract DAG is complete, and can bew explicitely // translated into a concrete DAG. This step is optional // though, and is implictely peformed on fire() if not // called previously. See infos about the Scheduler below. d->schedule (); // the (abstract or concrete) DAG is ready, and can be run d->fire (); std::cout << "dag running..." << std::endl; // wait 'til the dag had a chance to finish while ( digedag::Running == d->get_state () ) { ::sleep (1); std::cout << "dag waiting..." << std::endl; } // this is the same as the loop above s.wait (); --------------------------------------------------------------- The fire() method on the DAG scans all nodes and selects those which have no pending incoming edges, i.e. which have all prerequisites solved. The applications represented by these nodes are then run. After (successfull) completion, all outgoing edges for the node are fired, which causes the data represented by that edge to be transferred from src to tgt. If an edge succeeds with the transfer, the tgt node is then fired, etc etc. Eventually, the whole DAG (with all its nodes and edges) is completed. The Digedag Scheduler(****) --------------------------- A node's node_description and also an edge's src and tgt specification may leave the mapping of actions to be performed to specific resources undefined. I.e., the node_description may not contain a host name to run the job on, and the edge originating from that node may thus also have an unspecifed host element in their URL. DAGs with such unspecific nodes and edges are called 'abstract DAGs'. The Digedag scheduler is responsible for translating the abstract DAG into a concrete DAG, by assigning specific resources to nodes and edges. It is expolictely invoked when calling dag->schedule (); The digedag scheduler is actually not limited to that translation, but can in fact modify the complete DAG and its elements as needed. Also, the scheduler is invoked on the following occassions: - when dag.schedule() is called - when a dag gets created - when a dag gets nodes and edges added - when a dag gets nodes and edges removed - when a dag is fired - when a dag is completes successfully - when a dag is completes unsuccessfully - when a node is fired - when a node succeeded - when a node failed - when an edge is fired - when an edge's data transfer failed - when an edge's data transfer succeeded Thus, the scheduler class is able to react on dag creation or runtime errors immediately, can obserev all actions on a dag, and on its elements, and can globally optimize execution. More details in the 'Internals' section below. The Digedag Workflow Enactor ---------------------------- If a node's or edge's fire() method get called, the dependencies of that component are checked. If they are all resolved (i.e. all upstrean nodes and edges have been completed without error), the component state is changed to 'Running', and a separate thread is started to perform its specific action: for a node, a saga::job::job instance is created and started, according to the node_description; and for an edge, a saga::filesystem::file.copy operation is started, from the edges src to its tgt location (ore details below, in 'Internals'). Once that worker thread finishes, the components status changes to 'Done' or 'Failed'. Note that the digedag enactor is actually a local enactment: it only knows about the operations to be performed by one specific node or edge. The Digedag Workflow Parsers ---------------------------- A multitude of workflow description languages exist. Digedag provides two exemplary parser classes which allow to translate Pegasus input and output workflow descriptions into digedag DAGs. The parsers are actually rather simple: they create a dag instance, and for each nodes they encounter in the workflow description files, they create a suitable node_description, and add that node to the dag. For each data transfer to be performed, the parsers add a data transfer edge between the respective nodes. After parsing the WF description files, the parser classes return a the DAG instance via get_dag(), for further manipulation or enactment by the end user. Digedag Internals ----------------- [ This section may be important for Digedag developers, but notnecessarily beneficial for Digedag users ] When starting to change the digedag code, it is important to understand how scheduler and enactor interact with the DAG tree. Here are some notes about that topic, for the current implementation. - On most interactions on a dag instance, the dag is calling-out to its scheduler class. The method names of the scheduler provide a good idea on those callout points: hook_dag_create (void); hook_dag_destroy (void); hook_dag_schedule (void); hook_dag_run_pre (void); hook_dag_run_post (void); hook_dag_run_done (void); hook_dag_run_fail (void); hook_dag_wait (void); hook_node_add (boost::shared_ptr n); hook_node_remove (boost::shared_ptr n); hook_node_run_pre (boost::shared_ptr n); hook_node_run_done (boost::shared_ptr n); hook_node_run_fail (boost::shared_ptr n); hook_edge_add (boost::shared_ptr e); hook_edge_remove (boost::shared_ptr e); hook_edge_run_pre (boost::shared_ptr e); hook_edge_run_done (boost::shared_ptr e); hook_edge_run_fail (boost::shared_ptr e); In these hooks, the scheduler has access to the complete dag, and can optimize it, check it for consistency, etc. (not much is implemented, but new schedulers *can* do that. - there are two independent enactors right now: one for edges, one for nodes. It is not neccessary to implement it that way. - if a dag gets fired, it tries to find any nodes which have no input dependencies, and fires those. That node's fire will then call hook_node_run_pre(), which will register the node with the enactor. The enactor will move that node into queued_nodes_ (a std::dequeue), and will serve them (i.e. run them) first come, first served. - the queue will get worked upon in a separate thread: as long as not enough nodes or edges are active, the next element is taken from the queue, and started. Starting means: - The thread calls back into the node or edge, so calls either one of these saga::thread node::work_start (void); saga::thread edge::work_start (void); which returns a Running task which represent the workload. That task is put into the enactors task container. (note: eventually, the returned task should not be running, but should be new, and should be started by the enactor). - the enactor is watching all active tasks. If any task finishes, it calls back to the scheduler void work_finished (saga::task t, std::string flag); - the scheduler can figure out which node or edge that task belonged to (it maintains a node_task_map_ and a edge_task_map_ to do that), and in turn calls that entities work_done or work_failed routine, depending on the task's state. That routine will then update the entities state, and caus it to fire edges or tasks depending on it (if state is Done). - The above description leaves the distonction between scheduler and enactor vague. This is supposed to become more clearly separated soon: the scheduler is to become a mapper which only operates on the DAG structure and meta data, and the enactor will take over all operations on the workload tasks. ---------------------------------------------------------------- (*) 'abstract' because it describes the workflow in abstract application requirements, not in terms of concrete resource utilization. (**) Well, two parsers are included at the moment: for abstract workflows in DAX format, and for concrete workflows in Pegasus format. Adding additional parsers is straight forward. (***) The name 'Digedag' refers to to an East German comic series (see http://en.wikipedia.org/wiki/Mosaik#Digedags_years). Naming software is always painful, but digedag was the first German word coming to my mind which contained 'dag' ;-) One could stretch the argument though that Dig, Dag and Digedag are three very different persons, acting in concertation... (****) The digedag scheduler is not really a scheduler, but rather a mapper. A rename of this class/component is planned.