// Copyright (c) 2005-2007 Andre Merzky (andre@merzky.net) // Copyright (c) 2005-2007 Hartmut Kaiser (hartmut.kaiser@gmail.com) // Copyright (c) 2005-2006 Stephan Hirmer (shirmer@cct.lsu.edu) // // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include #include #include #include #if ! defined (BOOST_WINDOWS) # include # include #endif #if defined(BOOST_HAS_UNISTD_H) # include #endif #include #include #include #include #include #include #include #include #include #include #include #include /////////////////////////////////////////////////////////////////////////////// namespace saga { namespace impl { task_container::task_container (void) : object(saga::object::TaskContainer) { } task_container::~task_container (void) { // if(NULL != bs.get()) // bs->complete_wait(); } // According to the strawman API, this function throws nothing! // perhaps it should throw AllreadyAdded ? void task_container::add_task (saga::task & t) { task_list.push_back (t); } void task_container::remove_task (saga::task t) { tasks_type::iterator task_iter = task_list.begin (); // searching through the list of tasks // and comparing the 'real' task-object (impl). // see task.hpp bottom for further details. while ( task_iter != task_list.end () ) { if ( *task_iter == t ) { task_list.erase (task_iter); return; } ++task_iter; } SAGA_THROW("task_container: The task to remove was not found!", saga::DoesNotExist); } std::vector task_container::wait (const float& timeout) { // if(NULL != bs.get()) // bs->complete_wait(); size_t task_list_size = task_list.size (); tasks_type ret = get_tasks_with_state (saga::task::Canceled); if ( ret.size () == task_list_size ) SAGA_THROW("All tasks in this container have been canceled!", saga::IncorrectState); // get the finished tasks ret = get_tasks_with_state (saga::task::Done); // if already all the tasks have finished, we return immediately if ( ret.size () == task_list_size ) return ret; // --> 1st case: wait until all tasks have finished if ( timeout < 0 ) { wait_for_all_tasks_to_finish (); return task_list; } // --> 2nd case: wait for timeout seconds and return the finished // tasks if (timeout > 0.0) { time_t start_time = time (0); while ( (timeout - (difftime (time (0), start_time))) > 0 ) { ret = get_tasks_with_state (saga::task::Done); if ( ret.size () == task_list_size ) { break; } // we consider 5msec as a good slice task_base::sleep (5); } return ret; } // --> 3rd case: timeout = 0.0 then return immeadiately return get_tasks_with_state (saga::task::Done); } std::vector task_container::get_tasks_with_state (saga::task::state state) const { tasks_type ret; const_iterator_type end = task_list.end(); for (const_iterator_type it = task_list.begin(); it != end; ++it) { if ( it->get_state () == state ) { ret.push_back (*it); } } return (ret); } void task_container::wait_for_all_tasks_to_finish (void) { // bs->complete_wait(); std::for_each(task_list.begin(), task_list.end(), TR1::bind(&saga::task::wait, _1, -1.0)); } std::vector task_container::get_states (void) const { std::vector ret; const_iterator_type end = task_list.end(); for (const_iterator_type it = task_list.begin(); it != end; ++it) { ret.push_back (it->get_state()); } return (ret); } void task_container::simple_run() { // tasks are now treated one by one, so we reset the bulk-treatment flag. set_tasks_for_bulk_treatement(saga::impl::task_base::not_bulk_treated); std::for_each(task_list.begin(), task_list.end(), TR1::bind(&saga::task::run, _1)); } void task_container::set_state_for_all(saga::task::state s) { iterator_type end = task_list.end(); for (iterator_type it = task_list.begin(); it != end; ++it) { runtime::get_impl(*it)->set_state(s); } } void task_container::set_state_by_uuid(std::vector& ids, saga::task::state s) { typedef std::vector::iterator iterator_type; iterator_type end = ids.end(); for (iterator_type it = ids.begin(); it != end; ++it) { saga::task t = get_task_by_uuid(*it); runtime::get_impl(t)->set_state(s); } } saga::task task_container::get_task_by_uuid(saga::uuid const& id) { iterator_type end = task_list.end(); for (iterator_type it = task_list.begin(); it != end; ++it) { if ( ((*it).get_id()) == id ) return *it; } SAGA_THROW("No such uuid within current task_container", saga::DoesNotExist); return saga::task(saga::task::Done); } void task_container::set_tasks_for_bulk_treatement( task_base::bulk_treatment_state s) { iterator_type end = task_list.end(); for (iterator_type it = task_list.begin(); it != end; ++it) { runtime::get_impl(*it)->set_bulk_treatment(s); } } void task_container::run() { using namespace boost; saga::adaptors::task_blocker_for_bulk bulk_blocker(this); // pass the task to an analyzer bulk_analyser::sorted_tc_type res; if(getenv("SAGA_IMPR_BULK_SORTING") ) { // bulk_analyser_impr::sorted_tc_type res; // saga::impl::bulk_analyser_impr a; // a.analyse_bulk(*this, res); } else { saga::impl::bulk_analyser a; a.analyse_bulk(*this, res); } // according to chosen strategy ... if(getenv("SAGA_TRY_EXEC_BULK_HANDLING") ) { // saga::adaptors::bulk_strategy_try_exec strat; // strat.apply(res); using namespace saga::adaptors; bs = TR1::shared_ptr(new bulk_strategy_try_exec()); bs->apply(res); } else { // saga::adaptors::bulk_strategy_simple_sort strat; // strat.apply(res); using namespace saga::adaptors; bs = TR1::shared_ptr (new bulk_strategy_simple_sort()); bs->apply(res); } } void task_container::cancel (void) { std::for_each(task_list.begin(), task_list.end(), TR1::bind(&saga::task::cancel, _1)); } std::vector task_container::list_tasks (void) const { return (task_list); } } // namespace impl } // namespace saga