Abstract: --------- Digedag is an expertimental implementation of a SAGA based workflow package. It consists of four basic components: - API for programatically expressing workflows - parser for (abstract or concrete) workflow descriptions - workflow enactor (aka SAGA engine) - in-time workflow scheduler Introduction: ------------- Workflows are not expressed programatically, at least not explicitely. Instead, end users use graphical user interfaces to define graphs of depending operations, which, if executed in order, comprise the workflow. In most cases, the transition from the GUI specification of the workflow to be executed, to the enacted workflow, includes a number of intermediate steps which translate the specified 'abstract' (*) workflow into a 'concrete' workflow, which specifically includes information about resources to be utilized etc. Both abstract and concrete workflows are stored and/or communicated in data files of one format or the other (a wide variety of workflow description languages exist). Once the workflow is specified and concretized, it can be 'enacted', i.e. the individual tasks are submitted to the assigned concrete physical resources, and their execution and communication is orchestrated according to the specified workflow logic. Digedag has a slightly different approach on the end-user side of the scheme: instead of specifying a workflow in a GUI, digedag provides an API to specify tasks, task dependencies, and inter-task communication - i.e. to specify workflows programatically. Further, digedag includes a workflow scheduler, i.e. an entity which translates the thus specified abstract workflow into an internal concrete representation. And finally, digedag includes a SAGA based workflow enactor, i.e. an entity which executes and orchestrates the workflow's execution in a distributed environment. In order to allow for integration of digedag with exising workflow environments, it also includes parsers which use the digedag workflow API to translate workflow transcriptions from several workflow transcription languages into digedag representations(**). Digedags workflow API --------------------- Digedag allows to programatically express workflows. Basically, that statement translates well into: Digedag provides an API to specify Acyclic Directed Graphs (DAG). The DAGs specified via that API consist of nodes and edges. A node can have multiple incoming and outgoing edges. An edge has exactly one source node, and one target node. Loosely spoken, a node repesents a job to be executed, and a edge represents a data item, created by one job, and consumed by another one. The digedag API allows to specify DAGs as follows: { // create a new and empty DAG instance digedag::dag * d = new digedag::dag; // create two nodes for our dag. A node description is // basically a saga::job::description, and can specify the // application to run when the node fires digedag::node_description nd; digedag::node * n1 = new digedag::node (nd); digedag::node * n2 = new digedag::node (nd); // add two nodes to the dag, with identifiers d->add_node ("node_1", n1); d->add_node ("node_2", n2); // specify data to be transfered between the two nodes digedag::edge * e1 = new digedag::edge ("/tmp/src", "/tmp/tgt"); // add that edge to the DAG d->add_edge (e1, n1, n2); // the DAG is complete, and can be run d->fire (); std::cout << "dag running..." << std::endl; // wait til the dag had a chance to finish while ( digedag::Running == d->get_state () ) { ::sleep (1); std::cout << "dag waiting..." << std::endl; } // this is the same as the loop above s.wait (); // done - delete all dag components recursively delete d; } The fire() method on the DAG scans all nodes and selects those which have no pending incoming edges, i.e. which have all prerequisites solved. The applications represented by these nodes are then run. After (successfull) completion, all outgoing edges for the node are fired, which causes the data represented by that edge to be transferred from src to tgt. If an edge succeeds with the transfer, the tgt node is then fired, etc etc. Eventually, the whole DAG (i.e. all nodes and edges) is completed. The Digedag Scheduler(***) -------------------------- A node's node_description and also an edge's src and tgt specification may leave the mapping of actions to be performed to specific resources undefined. I.e., the node_description may not contain a host name to run the job on, and the edge originating from that node may thus also have an unspecifed host element in their URL. DAGs with such unspecific nodes and edges are called 'abstract DAGs'. The Digedag scheduler is responsible for translating the abstract DAG into a concrete DAG, by assigning specific resources to nodes and edges. It is invoked when calling dag->schedule (); But the digedag scheduler is actually not limited to that translation, but can in fact modify the complete DAG and its elements if needed. Also, the scheduler is invoked on the following occassions: - when dag.schedule() is called - when a dag gets created - when a dag gets nodes and edges added - when a dag gets nodes and edges removed - when a dag is fired - when a dag is completes successfully - when a dag is completes unsuccessfully - when a node is fired - when a node succeeded - when a node failed - when an edge is fired - when an edge's data transfer failed - when an edge's data transfer succeeded Thus, the scheduler class is able to react on runtime errors immediately, can obserev all actions on a dag, and on its elements, and can globally optimize execution. The Digedag Workflow Enactor ---------------------------- If a node's or edge's fire() method get called, the dependencies of that component are checked. If they are all resolved (i.e. all upstrean nodes and edges have been completed without error), the the component state is changed to Running, and a separate thread is started to perform its specific action: for a node, a saga::job::job instance is created and started, according to the node_description; and for an edge, a saga::filesystem::file.copy operation is started, from the edges src to its tgt location. Once that worker thread finishes, the components status changes to Done or Failed. Note that the digedag enactor is actually a local enactment: it only knows about the operations to be performed by one specific node or edge. The Digedag Workflow Parsers ---------------------------- A multitude of workflow description languages exist. Digedag provides two exemplary parser classes which allow to translate Pegasus input and output workflow descriptions into digedag DAGs. The parsers are actually rather simple: the create on dag, and for each node they encounter in the workflow description files, they create a suitable node_description, and add that node to the dag. For each data transfer to be performed, the parsers add a data transfer edge between the respectife nodes. After parsing the wf description files, the parser classes return a pointer to the DAG via get_dag(), for further manipulation by the end user. ---------------------------------------------------------------- (*) 'abstract' because it describes the workflow in abstract application requirements, not in terms of concrete resource utilization. (**) Well, two parsers are included at the moment: for abstract workflows in DAX format, and for concrete workflows in Pegasus format. Adding additional parsers is straight forward. (***) The Digedag scheduler is not really a scheduler...