// Copyright (c) 2005-2007 Andre Merzky (andre@merzky.net) // Copyright (c) 2005-2007 Hartmut Kaiser (hartmut.kaiser@gmail.com) // Copyright (c) 2005-2006 Stephan Hirmer (shirmer@cct.lsu.edu) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #ifndef SAGA_IMPL_ENGINE_WRAPPER_TASK_HPP #define SAGA_IMPL_ENGINE_WRAPPER_TASK_HPP #include #include #include #include #include // lexical_cast will be in TR2 #include #include #include #include #include #include #include #include #include #include #include #include /////////////////////////////////// #if defined(BOOST_MSVC) #pragma warning(push) #pragma warning(disable: 4251 4231 4275 4660) #endif using namespace boost::futures; namespace saga { namespace impl { /** * Generic task class, for encapsulating asynchronous function calls. * * Almost the same as saga::impl::task, with the slight difference * that here, the function to execute, takes a further parameter * more precisely a saga::uuid. * This is needed to enable monitoring of tasks, where the adaptor * implements the asynchronity. * * @see the saga::impl::task for a detailed descpription. */ template class wrapper_task; /////////////////////////////////////////////////////////////////////////// template class wrapper_task : public saga::impl::task_base, public saga::impl::object { private: typedef saga::impl::task_base base_type; typedef wrapper_task self_type; RetVal retval; void (FuncBase::*exec) (FuncRetV, saga::uuid); bool (FuncBase::*prep_func) (FuncRetV, saga::uuid); /*! the function to be bound to the future */ int bond (void) { saga::impl::task_base::state_setter setter (*this); int return_code = 1; try { TR1::shared_ptr adp( TR1::static_pointer_cast(this->cpi_instance_)); (adp.get()->*exec)(retval, this->get_uuid()); // set state to finished setter.state_ = saga::task::Done; return_code = 0; } catch ( saga::exception const& e ) { this->found_saga_exception_ = true; this->saved_saga_exception_ = e; } catch ( std::exception const& e ) { this->found_exception_ = true; this->saved_exception_ = e; } catch ( ... ) { TR1::shared_ptr impl( boost::const_pointer_cast( this->cpi_instance_->get_proxy()->shared_from_this())); this->found_saga_exception_ = true; this->saved_saga_exception_ = saga::exception(runtime::get_object(impl), "Unspecified error catched", saga::Unexpected); } return return_code; } public: /** * Default destructor */ ~wrapper_task (void) { try { while (saga::task_base::Running == get_state() && wait(0)) // while finished sleep(5); // we consider 5msec as a good slice } catch (saga::exception const&) { // nothing to do here, just return } } wrapper_task (std::string const & func_name_, TR1::shared_ptr cpi_, void (FuncBase::*func_) (FuncRetV, saga::uuid), RetVal ret, bool (FuncBase::*prep_func_) (FuncRetV, saga::uuid) = NULL) : base_type (func_name_, cpi_) , retval (ret) , exec (func_) , prep_func (prep_func_) { } /** * Executes the operation specified by the task_factory. * @note this function returns immediately. * @throw not_pending exception, if the task you want to run * is not pending. */ int run (void) { // test if there is a function to execute if (exec) { if ( (get_state () != saga::task::New) || is_bulk_treated ) { SAGA_THROW("incorrect state: task is not pending!", saga::IncorrectState); } // set state to running, propagate value saga::metric m (this->get_metric(saga::metrics::task_state)); m.set_attribute( saga::attributes::metric_value, boost::lexical_cast (saga::task::Running)); m.fire(); // executing the function, using futures and the boost/bind this->thread_ = simple_future ( TR1::bind (&wrapper_task::bond, this)); return 1; } // nothing to run??? BOOST_ASSERT(false); return 0; } /*! * @see cpi class for a description of this function */ virtual v1_0::cpi* get_bulk_adaptor(std::string const & cpi_name, std::string const & op_name, v1_0::preference_type const & prefs); /*! * @see cpi class for a description of this function */ virtual void visit_args(v1_0::cpi* bulk_adaptor) { if(NULL != prep_func && NULL != bulk_adaptor && is_bulk_treated) { // execute the encapsulated function. (static_cast(bulk_adaptor)->*prep_func)(retval, this->get_uuid()); // and save the bulk_adaptor. cpi_instance_ = bulk_adaptor->shared_from_this(); if (will_async_in_adaptor == is_external_bulk_async) { is_external_bulk_async = bulk_async_in_adaptor; } } } /////////////////////////////////////////////////////////////////////// // return the task_interface to the facade virtual saga::impl::task_interface* get_task_interface() { return this; } virtual saga::impl::task_interface const* get_task_interface() const { return this; } /////////////////////////////////////////////////////////////////////// // return the monitorable interface to the facade virtual saga::impl::monitorable* get_monitorable() { return this; } virtual saga::impl::monitorable const* get_monitorable() const { return this; } virtual saga::uuid get_id() const { return this->get_uuid(); } }; /////////////////////////////////////////////////////////////////////////// // bring in higher order functions #include }} // namespace saga::impl #endif // SAGA_IMPL_ENGINE_WRAPPER_TASK_HPP