// Copyright (c) 2005-2007 Andre Merzky (andre@merzky.net) // Copyright (c) 2005-2007 Hartmut Kaiser (hartmut.kaiser@gmail.com) // Copyright (c) 2005-2006 Stephan Hirmer (shirmer@cct.lsu.edu) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #ifndef SAGA_IMPL_ENGINE_TASK_HPP #define SAGA_IMPL_ENGINE_TASK_HPP #include #include #include #include #include // lexical_cast will be in TR2 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(BOOST_MSVC) #pragma warning(push) #pragma warning(disable: 4251 4231 4275 4660) #endif using namespace boost::futures; namespace saga { namespace impl { /** * Generic task class, for encapsulating asynchronous function calls. * * This class is not part of the saga facade, but of the implementation. * Function calls are stored here and executed within a thread. * * The stored function pointer is executed together with the * stored function parameter within a thread as soon as run is called. */ template class task; /////////////////////////////////////////////////////////////////////////// template class task : public saga::impl::task_base, public saga::impl::object { private: typedef saga::impl::task_base base_type; typedef task self_type; /*! The return value */ RetVal retval; /* The function pointer to execute in the thread */ void (FuncBase::*exec) (FuncRetV); /** * The bulk prepare function, which passes the function parameters * to a special bulk adaptor. */ bool (FuncBase::*prep_func) (FuncRetV, saga::uuid); /*! the function to be bound to the future */ int bond (void) { saga::impl::task_base::state_setter setter (*this); int return_code = 1; try { TR1::shared_ptr adp ( TR1::static_pointer_cast(this->cpi_instance_)); (adp.get()->*exec)(retval); // set state to finished setter.state_ = saga::task_base::Done; return_code = 0; } catch ( saga::exception const& e ) { this->found_saga_exception_ = true; this->saved_saga_exception_ = e; } catch ( std::exception const& e ) { this->found_exception_ = true; this->saved_exception_ = e; } catch ( ... ) { TR1::shared_ptr impl( boost::const_pointer_cast( this->cpi_instance_->get_proxy()->shared_from_this())); this->found_saga_exception_ = true; this->saved_saga_exception_ = saga::exception(runtime::get_object(impl), "Unspecified error catched", saga::Unexpected); } return return_code; } public: /** * Default destructor * * waits for the task to finish, before destructing the task. */ ~task (void) { try { while (saga::task_base::Running == get_state() && wait(0)) // while finished sleep(5); // we consider 5msec as a good slice } catch (saga::exception const&) { // nothing to do here, just return } } /** * constructor * * */ task (std::string const & func_name_, TR1::shared_ptr cpi_, void (FuncBase::*func_) (FuncRetV), RetVal ret, bool (FuncBase::*prep_func_) (FuncRetV, saga::uuid) = NULL) : base_type (func_name_, cpi_) , saga::impl::object(saga::object::Task) , retval (ret) , exec (func_) , prep_func (prep_func_) { } /** * Executes the operation which is stored in this task. * @note this function returns immediately. * @throw not_pending exception, if the task you want to run * is not pending. */ int run (void) { // test if there is a function to execute if (exec) { if ( (get_state () != saga::task_base::New) || is_bulk_treated ) { SAGA_THROW("incorrect state: task is not pending!", saga::IncorrectState); } // set state to running, propagate value saga::metric m (this->get_metric(saga::metrics::task_state)); m.set_attribute ( saga::attributes::metric_value, boost::lexical_cast (saga::task_base::Running)); m.fire(); // executing the function, using futures and the boost/bind this->thread_ = simple_future ( TR1::bind (&task::bond, this)); return 1; } // nothing to run??? BOOST_ASSERT(false); return 0; } /** * declared here, implementation is in external file task_get_bulk_adaptor. */ virtual v1_0::cpi* get_bulk_adaptor (std::string const & cpi_name, std::string const & op_name, v1_0::preference_type const & prefs) { proxy * proxy_ = this->cpi_instance_->get_proxy(); saga::session s = proxy_->get_session(); // FIXME: temporary hack! this list should get passed from the outside adaptor_selector::adaptor_info_list_type no_no_list; return (runtime::get_impl(s)->get_adaptor(cpi_name, op_name, prefs, proxy_, no_no_list)); } /*! * passes all the function parameters to a special bulk adaptor. */ virtual void visit_args(v1_0::cpi* bulk_adaptor) { if(NULL != prep_func && NULL != bulk_adaptor && is_bulk_treated) { (static_cast(bulk_adaptor)->*prep_func)(retval, this->get_uuid()); // and save the bulk_adaptor. cpi_instance_ = bulk_adaptor->shared_from_this(); if (will_async_in_adaptor == is_external_bulk_async) { is_external_bulk_async = bulk_async_in_adaptor; } } } /////////////////////////////////////////////////////////////////////// // return the task_interface to the facade virtual saga::impl::task_interface* get_task_interface() { return this; } virtual saga::impl::task_interface const* get_task_interface() const { return this; } /////////////////////////////////////////////////////////////////////// // return the monitorable interface to the facade virtual saga::impl::monitorable* get_monitorable() { return this; } virtual saga::impl::monitorable const* get_monitorable() const { return this; } virtual saga::uuid get_id() const { return this->get_uuid(); } }; /////////////////////////////////////////////////////////////////////////// // bring in higher order functions #include }} // namespace saga::impl #endif // SAGA_IMPL_ENGINE_TASK_HPP