// Copyright (c) 2005-2008 Hartmut Kaiser // Copyright (c) 2005-2007 Andre Merzky (andre@merzky.net) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #include #include #include #include #include #include #include #include #include #include "stream_adaptor.hpp" #include "stream.hpp" /////////////////////////////////////////////////////////////////////////////// namespace stream { /////////////////////////////////////////////////////////////////////////// namespace detail { std::string get_host(saga::url const& url) { return url.get_host(); } std::string get_port(saga::url const& url) { int port = url.get_port(); if (-1 == port) port = SAGA_DEFAULT_STREAM_DEFAULT_PORT; return boost::lexical_cast(port); } } /////////////////////////////////////////////////////////////////////////////// stream::stream (proxy* p, cpi_info const& info, saga::ini::ini const& glob_ini, saga::ini::ini const& adap_ini, TR1::shared_ptr adaptor) : base_cpi (p, info, adaptor, cpi::Fixme) { { // we implement only 'any' and 'tcp' schemes instance_data_type data(this); saga::url service_url(data->location_); std::string scheme(service_url.get_scheme()); if (!scheme.empty() && scheme != "tcp" && scheme != "any") { SAGA_ADAPTOR_THROW("Can't use schemes others from 'any' or 'tcp' " "for stream.", saga::NotImplemented); } } saga::stream::state state = saga::stream::New; { dyn_instance_data_type data(this); if (data.is_valid()) { // initialize if we already have a connected socket mutex_type::scoped_lock l(this->mtx_); sock_ = data->sock_; state = saga::stream::Open; } } update_state(state); } /////////////////////////////////////////////////////////////////////////////// stream::~stream (void) { try { // make sure connection gets closed before the end if (sock_->is_open()) sock_->close(); sock_.reset(); } catch (boost::system::system_error const& e) { SAGA_OSSTREAM errstrm; errstrm << "stream::~stream: could not close connection: " << e.what(); SAGA_LOG_CRITICAL(SAGA_OSSTREAM_GETSTRING(errstrm)); } } /////////////////////////////////////////////////////////////////////////////// void stream::update_state(saga::stream::state newstate) { // update state attribute and promote state change to the metric callbacks saga::monitorable monitor (this->proxy_); saga::adaptors::metric m ( monitor.get_metric(saga::stream::metrics::stream_state)); m.set_attribute(saga::attributes::metric_value, boost::lexical_cast(newstate)); m.fire(); // if we reached the 'Error' state, fire the 'stream_exception' metric if (newstate == saga::stream::Error) { m = monitor.get_metric(saga::stream::metrics::stream_exception); m.set_attribute(saga::attributes::metric_value, "1"); m.fire(); } } saga::stream::state stream::retrieve_state() { // get state attribute saga::monitorable monitor (this->proxy_); saga::metric m (monitor.get_metric(saga::stream::metrics::stream_state)); return (saga::stream::state)boost::lexical_cast( m.get_attribute(saga::attributes::metric_value)); } /////////////////////////////////////////////////////////////////////////////// // get_url() void stream::sync_get_url (saga::url & url) { instance_data_type data (this); url = data->location_; } void stream::sync_get_context(saga::context& ctx) { SAGA_ADAPTOR_THROW("Not Implemented (stream::sync_get_context)", saga::NotImplemented); } void stream::sync_connect(saga::context& ctx) { // verify stream is still 'New' saga::stream::state state = retrieve_state(); if (saga::stream::New != state) { SAGA_OSSTREAM errstrm; errstrm << "stream::sync_connect: stream is not in 'New' state, " "current state is: " << saga::adaptors::get_state_name(state); SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(errstrm), saga::IncorrectState); } // now try to connect try { saga::url location; { instance_data_type data(this); location = data->location_; } // try to resolve the given url using boost::asio::ip::tcp; tcp::resolver resolver(get_adaptor()->io_service_); tcp::resolver::query query(tcp::v4(), detail::get_host(location), detail::get_port(location)); tcp::resolver::iterator iterator = resolver.resolve(query); // create and connect the socket mutex_type::scoped_lock l(this->mtx_); sock_.reset(new tcp::socket(get_adaptor()->io_service_)); sock_->connect(*iterator); update_state(saga::stream::Open); } catch (boost::system::system_error const& e) { // set stream state update_state(saga::stream::Error); // throw appropriate exception SAGA_OSSTREAM errstrm; if (e.code() == boost::asio::error::already_connected) { // port already taken, spec requires to throw a BadParameter error errstrm << "stream::sync_connect: resource information given in " "the URL cannot be used temporarily (port already taken?): " << e.code().message(); SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(errstrm), saga::BadParameter); } errstrm << "stream::sync_connect: could not establish connection: " << e.what(); SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(errstrm), saga::NoSuccess); } } void stream::sync_wait(std::vector& ret, saga::stream::activity act, double timeout) { SAGA_ADAPTOR_THROW("Not Implemented (stream::sync_wait)", saga::NotImplemented); } void stream::sync_close(saga::impl::void_t& ret, double timeout) { // verify stream is 'Open' saga::stream::state state = retrieve_state(); if (saga::stream::Open != state) { SAGA_OSSTREAM errstrm; errstrm << "stream::sync_close: stream is not in 'Open' state, " "current state is: " << saga::adaptors::get_state_name(state); SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(errstrm), saga::IncorrectState); } // positive timeout is not supported if (timeout > 0.0) { SAGA_ADAPTOR_THROW( "stream_service::sync_close: positive timeout parameter values " "are not supported.", saga::BadParameter); } // close the socket and free associated resources try { sock_->close(); sock_.reset(); update_state(saga::stream::Closed); } catch (boost::system::system_error const& e) { // set stream state update_state(saga::stream::Error); // throw appropriate exception SAGA_OSSTREAM errstrm; errstrm << "stream::sync_close: could not close connection: " << e.what(); SAGA_ADAPTOR_THROW(SAGA_OSSTREAM_GETSTRING(errstrm), saga::NoSuccess); } } void stream::sync_read(saga::ssize_t& ret, saga::mutable_buffer data, saga::ssize_t len_in) { SAGA_ADAPTOR_THROW("Not Implemented (stream::sync_read)", saga::NotImplemented); } void stream::sync_write(saga::ssize_t& ret, saga::const_buffer data, saga::ssize_t len_in) { SAGA_ADAPTOR_THROW("Not Implemented (stream::sync_write)", saga::NotImplemented); } #if !defined(SAGA_DEFAULT_STREAM_ADAPTOR_NO_ASYNCS) /////////////////////////////////////////////////////////////////////////////// // This adaptor implements the async functions based on its own synchronous // functions for testing purposes only. Since there is no principal need // to do so, we allow these to be 'switched off'. saga::task stream::async_get_url () { return (saga::adaptors::task("stream::async_get_url", shared_from_this(), &stream::sync_get_url)); } saga::task stream::async_get_context() { return (saga::adaptors::task("stream::async_get_context", shared_from_this(), &stream::sync_get_context)); } saga::task stream::async_connect() { return (saga::adaptors::task("stream::async_connect", shared_from_this(), &stream::sync_connect)); } saga::task stream::async_wait(saga::stream::activity act, double timeout) { return (saga::adaptors::task("stream::async_wait", shared_from_this(), &stream::sync_wait, act, timeout)); } saga::task stream::async_close(double timeout) { return (saga::adaptors::task("stream::async_close", shared_from_this(), &stream::sync_close, timeout)); } saga::task stream::async_read(saga::mutable_buffer data, saga::ssize_t len_in) { return (saga::adaptors::task("stream::async_read", shared_from_this(), &stream::sync_read, data, len_in)); } saga::task stream::async_write(saga::const_buffer data, saga::ssize_t len_in) { return (saga::adaptors::task("stream::async_write", shared_from_this(), &stream::sync_write, data, len_in)); } #endif } // namespace stream