// Copyright (c) 2006 Stephan Hirmer (shirmer@cct.lsu.edu) // Copyright (c) 2005-2007 Hartmut Kaiser (hartmut.kaiser@gmail.com) // // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // lexical_cast will be in TR2 #include #include #include #include #include #include /////////////////////////////////////////////////////////////////////////////// namespace saga { namespace impl { task_base::~task_base (void) { } task_base::task_base(saga::task_base::state s /*= task_base::New*/) : found_exception_ (false), found_saga_exception_ (false), saved_saga_exception_ (saga::object(), "", Success), is_external_async (false), is_bulk_treated (not_bulk_treated), is_external_bulk_async(not_async_in_adaptor) { if (s == task_base::Done) { get_state_func = TR1::bind(&task_base::get_state_done, this); } else { get_state_func = TR1::bind(&task_base::get_state_task, this); create_state_metric(s); } } task_base::task_base (std::string const & func_name, TR1::shared_ptr cpi_instance, task_base::state s) : func_name_ (func_name), cpi_instance_ (cpi_instance), proxy_ (cpi_instance->get_proxy()->shared_from_this()), found_exception_ (false), found_saga_exception_ (false), saved_saga_exception_ (saga::object(), "", Success), is_external_async (false), is_bulk_treated (not_bulk_treated), is_external_bulk_async(not_async_in_adaptor) { if (s == task_base::Done) { get_state_func = TR1::bind(&task_base::get_state_done, this); } else { get_state_func = TR1::bind(&task_base::get_state_task, this); create_state_metric(s); } } saga::session task_base::get_session(void) const { return cpi_instance_->get_session(); } void task_base::create_state_metric(saga::task_base::state s) { saga::metric state (runtime::get_object(proxy_), saga::metrics::task_state, "Metric to monitor the current state of the task, " "e.g. pending, running, canceled or finished", attributes::metric_mode_readwrite, "1", attributes::metric_type_int, boost::lexical_cast (s)); this->add_metric_to_metrics(state); } /** * In general, there are two cases: * 1. the adaptor provides a synchronous function * 2. the adaptor provides a asynchronous function. * * In the first case: * hence, the task itself provides the asynchronity * by creating a new thread to run the adaptor function therein * Then, the task is able to return information about * its state by itself. * In the second case: * the asynchronity is provided by the adaptor, so the task * has no information about the current execution status * of the function it executes. * The only thing, it can do, is to ask the adaptor for this * information and to pass it back to the caller. */ saga::task_base::state task_base::get_state (void) const { return get_state_func(); } saga::task_base::state task_base::get_state_running (void) const { return saga::task_base::Running; } saga::task_base::state task_base::get_state_done (void) const { return saga::task_base::Done; } saga::task_base::state task_base::get_state_adaptor (void) const { return cpi_instance_->get_state(this->get_id()); } saga::task_base::state task_base::get_state_task (void) const { saga::metric state (this->get_metric(saga::metrics::task_state)); return (saga::task_base::state) boost::lexical_cast ( state.get_attribute (saga::attributes::metric_value)); } std::string task_base::get_func_name(void) const { return func_name_; } /////////////////////////////////////////////////////////////////////////// v1_0::cpi* task_base::get_bulk_adaptor(std::string const& cpi_name, std::string const& op_name, std::multimap const& prefs) { SAGA_THROW("Do not call get_bulk_adaptor() on 'task_base' class!", saga::NotImplemented); return NULL; } void task_base::visit_args(v1_0::cpi* bulk_adaptor) { SAGA_THROW("Do not call visit_args() on 'task_base' class!", saga::NotImplemented); } int task_base::run (void) { SAGA_THROW("Do not call run() on 'task_base' class!", saga::NotImplemented); return 0; } saga::uuid task_base::get_id() const { SAGA_THROW("Do not call get_id() on 'task_base' class!", saga::NotImplemented); return saga::uuid(); } bool task_base::wait (double timeout) { // TODO HARTMUT: timeout in futures // use a consistent value throughout this routine saga::task_base::state state = get_state(); if ( saga::task_base::Canceled == state) { SAGA_THROW("not not running: task was canceled!", saga::IncorrectState); } if ( (saga::task_base::New == state) && (!is_bulk_treated) && (!is_external_async) ) { SAGA_THROW("task not running, yet: is still pending!", saga::IncorrectState); } // if the task has finished, we return immediately if ( saga::task_base::Done == state || saga::task_base::Failed == state) { return true; } // if there is bulk treatment applied to this task and if // this bulk treatment is implemented asynchronously in the // bulk adaptor, we wait for this task to finish // and set its state to DONE. if (is_bulk_treated && bulk_async_in_adaptor == is_external_bulk_async && cpi_instance_->wait(timeout, this->get_id()) ) { // FIXME: this is correct only for timeout < 0 if (actual_bulk_treated == is_bulk_treated) set_state(saga::task_base::Done); return true; } // if the adaptor provides the asynchronism, we ask him to // wait for the task. if (is_external_async && !is_bulk_treated && cpi_instance_->wait(timeout, this->get_id()) ) { // FIXME: this is correct only for timeout < 0 if (actual_bulk_treated == is_bulk_treated) set_state(saga::task::Done); return true; } // --> 1st case: wait until the task has finished if ( timeout < 0.0 ) { // blocking call, in order to wait for the thread to finish! thread_(); return true; } // --> 2nd case: wait for timeout seconds and return the finished tasks if ( timeout > 0.0 ) { time_t start_time = time (0); while ( (timeout - (difftime (time(0), start_time))) > 0 ) { if ( saga::task_base::Running != get_state () ) return true; // we consider 5msec as a good slice saga::impl::task_base::sleep (5); } } // timeout == 0.0, or after waiting for timeout if ( saga::task_base::Done == get_state () ) return true; // the task is already in the Done state else return false; } // end wait void task_base::rethrow (void) const { if ( found_saga_exception_ ) { throw saved_saga_exception_; } if ( found_exception_ ) { throw saved_exception_; } } void task_base::cancel (void) { // TODO HARTMUT: provide cancel on future // Short description what is happening without cancel! // -> the current function call keeps going until it finishes by its own // with success or until it is aborted because of exceptions etc // -> towards application, it seems that the task is canceled, but it is not, // so problems might arise because of the application changing variables // used by the still running task, or even the other way round // -> etc etc // Conclusion: a real cancel operation, killing the thread seems to be // necessary! => thread safety, etc // FIXME: use attrib! // TODO: consider bulk treatment while implementing cancel. // if the adaptor handles the asynchronity by itself, // we ask him to finish the execution of this task. if (is_external_async) { cpi_instance_->cancel(this->get_id()); return; } // change task state and promote the new value to the listeners set_state(saga::task_base::Canceled); } void task_base::set_state(saga::task_base::state s) { // change task state and promote the new value to the listeners saga::metric m(this->get_metric(saga::metrics::task_state)); saga::attribute(m).set_attribute ( saga::attributes::metric_value, boost::lexical_cast (s)); m.fire(); } void task_base::set_external_treatment (bool is_external_treated_/* = true*/) { is_external_async = is_external_treated_; if (not_bulk_treated == is_bulk_treated && is_external_async) { get_state_func = TR1::bind(&task_base::get_state_adaptor, this); return; } get_state_func = TR1::bind(&task_base::get_state_task, this); } void task_base::set_bulk_treatment (bulk_treatment_state s/*=actual_bulk_treated*/) { is_bulk_treated = s; if (actual_bulk_treated == is_bulk_treated) { get_state_func = TR1::bind(&task_base::get_state_running, this); return; } if (not_bulk_treated == is_bulk_treated && is_external_async) { get_state_func = TR1::bind(&task_base::get_state_adaptor, this); return; } get_state_func = TR1::bind(&task_base::get_state_task, this); } void task_base::set_external_bulk_async(bulk_async_in_adaptor_state s /*= will_async_in_adaptor*/) { is_external_bulk_async = s; if (bulk_async_in_adaptor == is_external_bulk_async && was_bulk_treated == is_bulk_treated) { get_state_func = TR1::bind(&task_base::get_state_adaptor, this); return; } get_state_func = TR1::bind(&task_base::get_state_task, this); } /////////////////////////////////////////////////////////////////////////////// }} // namespace saga::impl