summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp')
-rw-r--r--3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp1195
1 files changed, 555 insertions, 640 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp b/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp
index 95d39dd..5f7bbf5 100644
--- a/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp
+++ b/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp
@@ -17,18 +17,17 @@
#include <boost/asio/detail/push_options.hpp>
-#include <boost/asio/detail/push_options.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/asio/detail/pop_options.hpp>
-
#include <boost/asio/buffer.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/socket_base.hpp>
#include <boost/asio/detail/bind_handler.hpp>
-#include <boost/asio/detail/handler_base_from_member.hpp>
+#include <boost/asio/detail/buffer_sequence_adapter.hpp>
+#include <boost/asio/detail/fenced_block.hpp>
#include <boost/asio/detail/noncopyable.hpp>
-#include <boost/asio/detail/service_base.hpp>
+#include <boost/asio/detail/null_buffers_op.hpp>
+#include <boost/asio/detail/reactor.hpp>
+#include <boost/asio/detail/reactor_op.hpp>
#include <boost/asio/detail/socket_holder.hpp>
#include <boost/asio/detail/socket_ops.hpp>
#include <boost/asio/detail/socket_types.hpp>
@@ -37,10 +36,8 @@ namespace boost {
namespace asio {
namespace detail {
-template <typename Protocol, typename Reactor>
+template <typename Protocol>
class reactive_socket_service
- : public boost::asio::detail::service_base<
- reactive_socket_service<Protocol, Reactor> >
{
public:
// The protocol type.
@@ -67,7 +64,7 @@ public:
private:
// Only this service will have access to the internal values.
- friend class reactive_socket_service<Protocol, Reactor>;
+ friend class reactive_socket_service<Protocol>;
// The native socket representation.
socket_type socket_;
@@ -87,7 +84,7 @@ public:
// User wants connection_aborted errors, which are disabled by default.
enable_connection_aborted = 4,
- // The user set the linger option. Needs to be checked when closing.
+ // The user set the linger option. Needs to be checked when closing.
user_set_linger = 8
};
@@ -98,17 +95,13 @@ public:
protocol_type protocol_;
// Per-descriptor data used by the reactor.
- typename Reactor::per_descriptor_data reactor_data_;
+ reactor::per_descriptor_data reactor_data_;
};
- // The maximum number of buffers to support in a single operation.
- enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
-
// Constructor.
reactive_socket_service(boost::asio::io_service& io_service)
- : boost::asio::detail::service_base<
- reactive_socket_service<Protocol, Reactor> >(io_service),
- reactor_(boost::asio::use_service<Reactor>(io_service))
+ : io_service_impl_(use_service<io_service_impl>(io_service)),
+ reactor_(use_service<reactor>(io_service))
{
reactor_.init_task();
}
@@ -452,43 +445,30 @@ public:
return ec;
}
- if (command.name() == static_cast<int>(FIONBIO))
+ socket_ops::ioctl(impl.socket_, command.name(),
+ static_cast<ioctl_arg_type*>(command.data()), ec);
+
+ // When updating the non-blocking mode we always perform the ioctl
+ // syscall, even if the flags would otherwise indicate that the socket is
+ // already in the correct state. This ensures that the underlying socket
+ // is put into the state that has been requested by the user. If the ioctl
+ // syscall was successful then we need to update the flags to match.
+ if (!ec && command.name() == static_cast<int>(FIONBIO))
{
- // Flags are manipulated in a temporary variable so that the socket
- // implementation is not updated unless the ioctl operation succeeds.
- unsigned char new_flags = impl.flags_;
if (*static_cast<ioctl_arg_type*>(command.data()))
- new_flags |= implementation_type::user_set_non_blocking;
- else
- new_flags &= ~implementation_type::user_set_non_blocking;
-
- // Perform ioctl on socket if the non-blocking state has changed.
- if (!(impl.flags_ & implementation_type::non_blocking)
- && (new_flags & implementation_type::non_blocking))
{
- ioctl_arg_type non_blocking = 1;
- socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
- }
- else if ((impl.flags_ & implementation_type::non_blocking)
- && !(new_flags & implementation_type::non_blocking))
- {
- ioctl_arg_type non_blocking = 0;
- socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
+ impl.flags_ |= implementation_type::user_set_non_blocking;
}
else
{
- ec = boost::system::error_code();
+ // Clearing the non-blocking mode always overrides any internally-set
+ // non-blocking flag. Any subsequent asynchronous operations will need
+ // to re-enable non-blocking I/O.
+ impl.flags_ &= ~(implementation_type::user_set_non_blocking
+ | implementation_type::internal_non_blocking);
}
-
- // Update socket implementation's flags only if successful.
- if (!ec)
- impl.flags_ = new_flags;
- }
- else
- {
- socket_ops::ioctl(impl.socket_, command.name(),
- static_cast<ioctl_arg_type*>(command.data()), ec);
}
+
return ec;
}
@@ -553,23 +533,11 @@ public:
return 0;
}
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename ConstBufferSequence::const_iterator iter = buffers.begin();
- typename ConstBufferSequence::const_iterator end = buffers.end();
- size_t i = 0;
- size_t total_buffer_size = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::const_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<const void*>(buffer),
- boost::asio::buffer_size(buffer));
- total_buffer_size += boost::asio::buffer_size(buffer);
- }
+ buffer_sequence_adapter<boost::asio::const_buffer,
+ ConstBufferSequence> bufs(buffers);
// A request to receive 0 bytes on a stream socket is a no-op.
- if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
+ if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty())
{
ec = boost::system::error_code();
return 0;
@@ -579,7 +547,8 @@ public:
for (;;)
{
// Try to complete the operation without blocking.
- int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
+ int bytes_sent = socket_ops::send(impl.socket_,
+ bufs.buffers(), bufs.count(), flags, ec);
// Check if operation succeeded.
if (bytes_sent >= 0)
@@ -613,50 +582,32 @@ public:
return 0;
}
- template <typename ConstBufferSequence, typename Handler>
- class send_operation :
- public handler_base_from_member<Handler>
+ template <typename ConstBufferSequence>
+ class send_op_base : public reactor_op
{
public:
- send_operation(socket_type socket, boost::asio::io_service& io_service,
- const ConstBufferSequence& buffers, socket_base::message_flags flags,
- Handler handler)
- : handler_base_from_member<Handler>(handler),
+ send_op_base(socket_type socket, const ConstBufferSequence& buffers,
+ socket_base::message_flags flags, func_type complete_func)
+ : reactor_op(&send_op_base::do_perform, complete_func),
socket_(socket),
- io_service_(io_service),
- work_(io_service),
buffers_(buffers),
flags_(flags)
{
}
- bool perform(boost::system::error_code& ec,
- std::size_t& bytes_transferred)
+ static bool do_perform(reactor_op* base)
{
- // Check whether the operation was successful.
- if (ec)
- {
- bytes_transferred = 0;
- return true;
- }
+ send_op_base* o(static_cast<send_op_base*>(base));
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename ConstBufferSequence::const_iterator iter = buffers_.begin();
- typename ConstBufferSequence::const_iterator end = buffers_.end();
- size_t i = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::const_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<const void*>(buffer),
- boost::asio::buffer_size(buffer));
- }
+ buffer_sequence_adapter<boost::asio::const_buffer,
+ ConstBufferSequence> bufs(o->buffers_);
for (;;)
{
// Send the data.
- int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
+ boost::system::error_code ec;
+ int bytes = socket_ops::send(o->socket_,
+ bufs.buffers(), bufs.count(), o->flags_, ec);
// Retry operation if interrupted by signal.
if (ec == boost::asio::error::interrupted)
@@ -667,127 +618,92 @@ public:
|| ec == boost::asio::error::try_again)
return false;
- bytes_transferred = (bytes < 0 ? 0 : bytes);
+ o->ec_ = ec;
+ o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
return true;
}
}
- void complete(const boost::system::error_code& ec,
- std::size_t bytes_transferred)
- {
- io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
- }
-
private:
socket_type socket_;
- boost::asio::io_service& io_service_;
- boost::asio::io_service::work work_;
ConstBufferSequence buffers_;
socket_base::message_flags flags_;
};
- // Start an asynchronous send. The data being sent must be valid for the
- // lifetime of the asynchronous operation.
template <typename ConstBufferSequence, typename Handler>
- void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
- socket_base::message_flags flags, Handler handler)
- {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- }
- else
- {
- if (impl.protocol_.type() == SOCK_STREAM)
- {
- // Determine total size of buffers.
- typename ConstBufferSequence::const_iterator iter = buffers.begin();
- typename ConstBufferSequence::const_iterator end = buffers.end();
- size_t i = 0;
- size_t total_buffer_size = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::const_buffer buffer(*iter);
- total_buffer_size += boost::asio::buffer_size(buffer);
- }
-
- // A request to receive 0 bytes on a stream socket is a no-op.
- if (total_buffer_size == 0)
- {
- this->get_io_service().post(bind_handler(handler,
- boost::system::error_code(), 0));
- return;
- }
- }
-
- // Make socket non-blocking.
- if (!(impl.flags_ & implementation_type::internal_non_blocking))
- {
- if (!(impl.flags_ & implementation_type::non_blocking))
- {
- ioctl_arg_type non_blocking = 1;
- boost::system::error_code ec;
- if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
- {
- this->get_io_service().post(bind_handler(handler, ec, 0));
- return;
- }
- }
- impl.flags_ |= implementation_type::internal_non_blocking;
- }
-
- reactor_.start_write_op(impl.socket_, impl.reactor_data_,
- send_operation<ConstBufferSequence, Handler>(
- impl.socket_, this->get_io_service(), buffers, flags, handler));
- }
- }
-
- template <typename Handler>
- class null_buffers_operation :
- public handler_base_from_member<Handler>
+ class send_op : public send_op_base<ConstBufferSequence>
{
public:
- null_buffers_operation(boost::asio::io_service& io_service, Handler handler)
- : handler_base_from_member<Handler>(handler),
- work_(io_service)
+ send_op(socket_type socket, const ConstBufferSequence& buffers,
+ socket_base::message_flags flags, Handler handler)
+ : send_op_base<ConstBufferSequence>(socket,
+ buffers, flags, &send_op::do_complete),
+ handler_(handler)
{
}
- bool perform(boost::system::error_code&,
- std::size_t& bytes_transferred)
+ static void do_complete(io_service_impl* owner, operation* base,
+ boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/)
{
- bytes_transferred = 0;
- return true;
- }
+ // Take ownership of the handler object.
+ send_op* o(static_cast<send_op*>(base));
+ typedef handler_alloc_traits<Handler, send_op> alloc_traits;
+ handler_ptr<alloc_traits> ptr(o->handler_, o);
- void complete(const boost::system::error_code& ec,
- std::size_t bytes_transferred)
- {
- work_.get_io_service().post(bind_handler(
- this->handler_, ec, bytes_transferred));
+ // Make the upcall if required.
+ if (owner)
+ {
+ // Make a copy of the handler so that the memory can be deallocated
+ // before the upcall is made. Even if we're not about to make an
+ // upcall, a sub-object of the handler may be the true owner of the
+ // memory associated with the handler. Consequently, a local copy of
+ // the handler is required to ensure that any owning sub-object remains
+ // valid until after we have deallocated the memory here.
+ detail::binder2<Handler, boost::system::error_code, std::size_t>
+ handler(o->handler_, o->ec_, o->bytes_transferred_);
+ ptr.reset();
+ boost::asio::detail::fenced_block b;
+ boost_asio_handler_invoke_helpers::invoke(handler, handler);
+ }
}
private:
- boost::asio::io_service::work work_;
+ Handler handler_;
};
+ // Start an asynchronous send. The data being sent must be valid for the
+ // lifetime of the asynchronous operation.
+ template <typename ConstBufferSequence, typename Handler>
+ void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
+ socket_base::message_flags flags, Handler handler)
+ {
+ // Allocate and construct an operation to wrap the handler.
+ typedef send_op<ConstBufferSequence, Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr,
+ impl.socket_, buffers, flags, handler);
+
+ start_op(impl, reactor::write_op, ptr.get(), true,
+ (impl.protocol_.type() == SOCK_STREAM
+ && buffer_sequence_adapter<boost::asio::const_buffer,
+ ConstBufferSequence>::all_empty(buffers)));
+ ptr.release();
+ }
+
// Start an asynchronous wait until data can be sent without blocking.
template <typename Handler>
void async_send(implementation_type& impl, const null_buffers&,
socket_base::message_flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- }
- else
- {
- reactor_.start_write_op(impl.socket_, impl.reactor_data_,
- null_buffers_operation<Handler>(this->get_io_service(), handler),
- false);
- }
+ // Allocate and construct an operation to wrap the handler.
+ typedef null_buffers_op<Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, handler);
+
+ start_op(impl, reactor::write_op, ptr.get(), false, false);
+ ptr.release();
}
// Send a datagram to the specified endpoint. Returns the number of bytes
@@ -803,25 +719,15 @@ public:
return 0;
}
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename ConstBufferSequence::const_iterator iter = buffers.begin();
- typename ConstBufferSequence::const_iterator end = buffers.end();
- size_t i = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::const_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<const void*>(buffer),
- boost::asio::buffer_size(buffer));
- }
+ buffer_sequence_adapter<boost::asio::const_buffer,
+ ConstBufferSequence> bufs(buffers);
// Send the data.
for (;;)
{
// Try to complete the operation without blocking.
- int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
- destination.data(), destination.size(), ec);
+ int bytes_sent = socket_ops::sendto(impl.socket_, bufs.buffers(),
+ bufs.count(), flags, destination.data(), destination.size(), ec);
// Check if operation succeeded.
if (bytes_sent >= 0)
@@ -856,52 +762,34 @@ public:
return 0;
}
- template <typename ConstBufferSequence, typename Handler>
- class send_to_operation :
- public handler_base_from_member<Handler>
+ template <typename ConstBufferSequence>
+ class send_to_op_base : public reactor_op
{
public:
- send_to_operation(socket_type socket, boost::asio::io_service& io_service,
- const ConstBufferSequence& buffers, const endpoint_type& endpoint,
- socket_base::message_flags flags, Handler handler)
- : handler_base_from_member<Handler>(handler),
+ send_to_op_base(socket_type socket, const ConstBufferSequence& buffers,
+ const endpoint_type& endpoint, socket_base::message_flags flags,
+ func_type complete_func)
+ : reactor_op(&send_to_op_base::do_perform, complete_func),
socket_(socket),
- io_service_(io_service),
- work_(io_service),
buffers_(buffers),
destination_(endpoint),
flags_(flags)
{
}
- bool perform(boost::system::error_code& ec,
- std::size_t& bytes_transferred)
+ static bool do_perform(reactor_op* base)
{
- // Check whether the operation was successful.
- if (ec)
- {
- bytes_transferred = 0;
- return true;
- }
+ send_to_op_base* o(static_cast<send_to_op_base*>(base));
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename ConstBufferSequence::const_iterator iter = buffers_.begin();
- typename ConstBufferSequence::const_iterator end = buffers_.end();
- size_t i = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::const_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<const void*>(buffer),
- boost::asio::buffer_size(buffer));
- }
+ buffer_sequence_adapter<boost::asio::const_buffer,
+ ConstBufferSequence> bufs(o->buffers_);
for (;;)
{
// Send the data.
- int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
- destination_.data(), destination_.size(), ec);
+ boost::system::error_code ec;
+ int bytes = socket_ops::sendto(o->socket_, bufs.buffers(), bufs.count(),
+ o->flags_, o->destination_.data(), o->destination_.size(), ec);
// Retry operation if interrupted by signal.
if (ec == boost::asio::error::interrupted)
@@ -912,62 +800,78 @@ public:
|| ec == boost::asio::error::try_again)
return false;
- bytes_transferred = (bytes < 0 ? 0 : bytes);
+ o->ec_ = ec;
+ o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
return true;
}
}
- void complete(const boost::system::error_code& ec,
- std::size_t bytes_transferred)
- {
- io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
- }
-
private:
socket_type socket_;
- boost::asio::io_service& io_service_;
- boost::asio::io_service::work work_;
ConstBufferSequence buffers_;
endpoint_type destination_;
socket_base::message_flags flags_;
};
- // Start an asynchronous send. The data being sent must be valid for the
- // lifetime of the asynchronous operation.
template <typename ConstBufferSequence, typename Handler>
- void async_send_to(implementation_type& impl,
- const ConstBufferSequence& buffers,
- const endpoint_type& destination, socket_base::message_flags flags,
- Handler handler)
+ class send_to_op : public send_to_op_base<ConstBufferSequence>
{
- if (!is_open(impl))
+ public:
+ send_to_op(socket_type socket, const ConstBufferSequence& buffers,
+ const endpoint_type& endpoint, socket_base::message_flags flags,
+ Handler handler)
+ : send_to_op_base<ConstBufferSequence>(socket,
+ buffers, endpoint, flags, &send_to_op::do_complete),
+ handler_(handler)
{
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
}
- else
+
+ static void do_complete(io_service_impl* owner, operation* base,
+ boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/)
{
- // Make socket non-blocking.
- if (!(impl.flags_ & implementation_type::internal_non_blocking))
+ // Take ownership of the handler object.
+ send_to_op* o(static_cast<send_to_op*>(base));
+ typedef handler_alloc_traits<Handler, send_to_op> alloc_traits;
+ handler_ptr<alloc_traits> ptr(o->handler_, o);
+
+ // Make the upcall if required.
+ if (owner)
{
- if (!(impl.flags_ & implementation_type::non_blocking))
- {
- ioctl_arg_type non_blocking = 1;
- boost::system::error_code ec;
- if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
- {
- this->get_io_service().post(bind_handler(handler, ec, 0));
- return;
- }
- }
- impl.flags_ |= implementation_type::internal_non_blocking;
+ // Make a copy of the handler so that the memory can be deallocated
+ // before the upcall is made. Even if we're not about to make an
+ // upcall, a sub-object of the handler may be the true owner of the
+ // memory associated with the handler. Consequently, a local copy of
+ // the handler is required to ensure that any owning sub-object remains
+ // valid until after we have deallocated the memory here.
+ detail::binder2<Handler, boost::system::error_code, std::size_t>
+ handler(o->handler_, o->ec_, o->bytes_transferred_);
+ ptr.reset();
+ boost::asio::detail::fenced_block b;
+ boost_asio_handler_invoke_helpers::invoke(handler, handler);
}
-
- reactor_.start_write_op(impl.socket_, impl.reactor_data_,
- send_to_operation<ConstBufferSequence, Handler>(
- impl.socket_, this->get_io_service(), buffers,
- destination, flags, handler));
}
+
+ private:
+ Handler handler_;
+ };
+
+ // Start an asynchronous send. The data being sent must be valid for the
+ // lifetime of the asynchronous operation.
+ template <typename ConstBufferSequence, typename Handler>
+ void async_send_to(implementation_type& impl,
+ const ConstBufferSequence& buffers,
+ const endpoint_type& destination, socket_base::message_flags flags,
+ Handler handler)
+ {
+ // Allocate and construct an operation to wrap the handler.
+ typedef send_to_op<ConstBufferSequence, Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_,
+ buffers, destination, flags, handler);
+
+ start_op(impl, reactor::write_op, ptr.get(), true, false);
+ ptr.release();
}
// Start an asynchronous wait until data can be sent without blocking.
@@ -975,17 +879,14 @@ public:
void async_send_to(implementation_type& impl, const null_buffers&,
socket_base::message_flags, const endpoint_type&, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- }
- else
- {
- reactor_.start_write_op(impl.socket_, impl.reactor_data_,
- null_buffers_operation<Handler>(this->get_io_service(), handler),
- false);
- }
+ // Allocate and construct an operation to wrap the handler.
+ typedef null_buffers_op<Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, handler);
+
+ start_op(impl, reactor::write_op, ptr.get(), false, false);
+ ptr.release();
}
// Receive some data from the peer. Returns the number of bytes received.
@@ -1000,23 +901,11 @@ public:
return 0;
}
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename MutableBufferSequence::const_iterator iter = buffers.begin();
- typename MutableBufferSequence::const_iterator end = buffers.end();
- size_t i = 0;
- size_t total_buffer_size = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::mutable_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<void*>(buffer),
- boost::asio::buffer_size(buffer));
- total_buffer_size += boost::asio::buffer_size(buffer);
- }
+ buffer_sequence_adapter<boost::asio::mutable_buffer,
+ MutableBufferSequence> bufs(buffers);
// A request to receive 0 bytes on a stream socket is a no-op.
- if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
+ if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty())
{
ec = boost::system::error_code();
return 0;
@@ -1026,7 +915,8 @@ public:
for (;;)
{
// Try to complete the operation without blocking.
- int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
+ int bytes_recvd = socket_ops::recv(impl.socket_,
+ bufs.buffers(), bufs.count(), flags, ec);
// Check if operation succeeded.
if (bytes_recvd > 0)
@@ -1067,53 +957,35 @@ public:
return 0;
}
- template <typename MutableBufferSequence, typename Handler>
- class receive_operation :
- public handler_base_from_member<Handler>
+ template <typename MutableBufferSequence>
+ class receive_op_base : public reactor_op
{
public:
- receive_operation(socket_type socket, int protocol_type,
- boost::asio::io_service& io_service,
+ receive_op_base(socket_type socket, int protocol_type,
const MutableBufferSequence& buffers,
- socket_base::message_flags flags, Handler handler)
- : handler_base_from_member<Handler>(handler),
+ socket_base::message_flags flags, func_type complete_func)
+ : reactor_op(&receive_op_base::do_perform, complete_func),
socket_(socket),
protocol_type_(protocol_type),
- io_service_(io_service),
- work_(io_service),
buffers_(buffers),
flags_(flags)
{
}
- bool perform(boost::system::error_code& ec,
- std::size_t& bytes_transferred)
+ static bool do_perform(reactor_op* base)
{
- // Check whether the operation was successful.
- if (ec)
- {
- bytes_transferred = 0;
- return true;
- }
+ receive_op_base* o(static_cast<receive_op_base*>(base));
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename MutableBufferSequence::const_iterator iter = buffers_.begin();
- typename MutableBufferSequence::const_iterator end = buffers_.end();
- size_t i = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::mutable_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<void*>(buffer),
- boost::asio::buffer_size(buffer));
- }
+ buffer_sequence_adapter<boost::asio::mutable_buffer,
+ MutableBufferSequence> bufs(o->buffers_);
for (;;)
{
// Receive some data.
- int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
- if (bytes == 0 && protocol_type_ == SOCK_STREAM)
+ boost::system::error_code ec;
+ int bytes = socket_ops::recv(o->socket_,
+ bufs.buffers(), bufs.count(), o->flags_, ec);
+ if (bytes == 0 && o->protocol_type_ == SOCK_STREAM)
ec = boost::asio::error::eof;
// Retry operation if interrupted by signal.
@@ -1125,93 +997,84 @@ public:
|| ec == boost::asio::error::try_again)
return false;
- bytes_transferred = (bytes < 0 ? 0 : bytes);
+ o->ec_ = ec;
+ o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
return true;
}
}
- void complete(const boost::system::error_code& ec,
- std::size_t bytes_transferred)
- {
- io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
- }
-
private:
socket_type socket_;
int protocol_type_;
- boost::asio::io_service& io_service_;
- boost::asio::io_service::work work_;
MutableBufferSequence buffers_;
socket_base::message_flags flags_;
};
- // Start an asynchronous receive. The buffer for the data being received
- // must be valid for the lifetime of the asynchronous operation.
template <typename MutableBufferSequence, typename Handler>
- void async_receive(implementation_type& impl,
- const MutableBufferSequence& buffers,
- socket_base::message_flags flags, Handler handler)
+ class receive_op : public receive_op_base<MutableBufferSequence>
{
- if (!is_open(impl))
+ public:
+ receive_op(socket_type socket, int protocol_type,
+ const MutableBufferSequence& buffers,
+ socket_base::message_flags flags, Handler handler)
+ : receive_op_base<MutableBufferSequence>(socket,
+ protocol_type, buffers, flags, &receive_op::do_complete),
+ handler_(handler)
{
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
}
- else
- {
- if (impl.protocol_.type() == SOCK_STREAM)
- {
- // Determine total size of buffers.
- typename MutableBufferSequence::const_iterator iter = buffers.begin();
- typename MutableBufferSequence::const_iterator end = buffers.end();
- size_t i = 0;
- size_t total_buffer_size = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::mutable_buffer buffer(*iter);
- total_buffer_size += boost::asio::buffer_size(buffer);
- }
-
- // A request to receive 0 bytes on a stream socket is a no-op.
- if (total_buffer_size == 0)
- {
- this->get_io_service().post(bind_handler(handler,
- boost::system::error_code(), 0));
- return;
- }
- }
- // Make socket non-blocking.
- if (!(impl.flags_ & implementation_type::internal_non_blocking))
- {
- if (!(impl.flags_ & implementation_type::non_blocking))
- {
- ioctl_arg_type non_blocking = 1;
- boost::system::error_code ec;
- if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
- {
- this->get_io_service().post(bind_handler(handler, ec, 0));
- return;
- }
- }
- impl.flags_ |= implementation_type::internal_non_blocking;
- }
+ static void do_complete(io_service_impl* owner, operation* base,
+ boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/)
+ {
+ // Take ownership of the handler object.
+ receive_op* o(static_cast<receive_op*>(base));
+ typedef handler_alloc_traits<Handler, receive_op> alloc_traits;
+ handler_ptr<alloc_traits> ptr(o->handler_, o);
- if (flags & socket_base::message_out_of_band)
+ // Make the upcall if required.
+ if (owner)
{
- reactor_.start_except_op(impl.socket_, impl.reactor_data_,
- receive_operation<MutableBufferSequence, Handler>(
- impl.socket_, impl.protocol_.type(),
- this->get_io_service(), buffers, flags, handler));
- }
- else
- {
- reactor_.start_read_op(impl.socket_, impl.reactor_data_,
- receive_operation<MutableBufferSequence, Handler>(
- impl.socket_, impl.protocol_.type(),
- this->get_io_service(), buffers, flags, handler));
+ // Make a copy of the handler so that the memory can be deallocated
+ // before the upcall is made. Even if we're not about to make an
+ // upcall, a sub-object of the handler may be the true owner of the
+ // memory associated with the handler. Consequently, a local copy of
+ // the handler is required to ensure that any owning sub-object remains
+ // valid until after we have deallocated the memory here.
+ detail::binder2<Handler, boost::system::error_code, std::size_t>
+ handler(o->handler_, o->ec_, o->bytes_transferred_);
+ ptr.reset();
+ boost::asio::detail::fenced_block b;
+ boost_asio_handler_invoke_helpers::invoke(handler, handler);
}
}
+
+ private:
+ Handler handler_;
+ };
+
+ // Start an asynchronous receive. The buffer for the data being received
+ // must be valid for the lifetime of the asynchronous operation.
+ template <typename MutableBufferSequence, typename Handler>
+ void async_receive(implementation_type& impl,
+ const MutableBufferSequence& buffers,
+ socket_base::message_flags flags, Handler handler)
+ {
+ // Allocate and construct an operation to wrap the handler.
+ typedef receive_op<MutableBufferSequence, Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ int protocol_type = impl.protocol_.type();
+ handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_,
+ protocol_type, buffers, flags, handler);
+
+ start_op(impl,
+ (flags & socket_base::message_out_of_band)
+ ? reactor::except_op : reactor::read_op,
+ ptr.get(), (flags & socket_base::message_out_of_band) == 0,
+ (impl.protocol_.type() == SOCK_STREAM
+ && buffer_sequence_adapter<boost::asio::mutable_buffer,
+ MutableBufferSequence>::all_empty(buffers)));
+ ptr.release();
}
// Wait until data can be received without blocking.
@@ -1219,22 +1082,17 @@ public:
void async_receive(implementation_type& impl, const null_buffers&,
socket_base::message_flags flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- }
- else if (flags & socket_base::message_out_of_band)
- {
- reactor_.start_except_op(impl.socket_, impl.reactor_data_,
- null_buffers_operation<Handler>(this->get_io_service(), handler));
- }
- else
- {
- reactor_.start_read_op(impl.socket_, impl.reactor_data_,
- null_buffers_operation<Handler>(this->get_io_service(), handler),
- false);
- }
+ // Allocate and construct an operation to wrap the handler.
+ typedef null_buffers_op<Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, handler);
+
+ start_op(impl,
+ (flags & socket_base::message_out_of_band)
+ ? reactor::except_op : reactor::read_op,
+ ptr.get(), false, false);
+ ptr.release();
}
// Receive a datagram with the endpoint of the sender. Returns the number of
@@ -1251,26 +1109,16 @@ public:
return 0;
}
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename MutableBufferSequence::const_iterator iter = buffers.begin();
- typename MutableBufferSequence::const_iterator end = buffers.end();
- size_t i = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::mutable_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<void*>(buffer),
- boost::asio::buffer_size(buffer));
- }
+ buffer_sequence_adapter<boost::asio::mutable_buffer,
+ MutableBufferSequence> bufs(buffers);
// Receive some data.
for (;;)
{
// Try to complete the operation without blocking.
std::size_t addr_len = sender_endpoint.capacity();
- int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
- sender_endpoint.data(), &addr_len, ec);
+ int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs.buffers(),
+ bufs.count(), flags, sender_endpoint.data(), &addr_len, ec);
// Check if operation succeeded.
if (bytes_recvd > 0)
@@ -1318,56 +1166,37 @@ public:
return 0;
}
- template <typename MutableBufferSequence, typename Handler>
- class receive_from_operation :
- public handler_base_from_member<Handler>
+ template <typename MutableBufferSequence>
+ class receive_from_op_base : public reactor_op
{
public:
- receive_from_operation(socket_type socket, int protocol_type,
- boost::asio::io_service& io_service,
+ receive_from_op_base(socket_type socket, int protocol_type,
const MutableBufferSequence& buffers, endpoint_type& endpoint,
- socket_base::message_flags flags, Handler handler)
- : handler_base_from_member<Handler>(handler),
+ socket_base::message_flags flags, func_type complete_func)
+ : reactor_op(&receive_from_op_base::do_perform, complete_func),
socket_(socket),
protocol_type_(protocol_type),
- io_service_(io_service),
- work_(io_service),
buffers_(buffers),
sender_endpoint_(endpoint),
flags_(flags)
{
}
- bool perform(boost::system::error_code& ec,
- std::size_t& bytes_transferred)
+ static bool do_perform(reactor_op* base)
{
- // Check whether the operation was successful.
- if (ec)
- {
- bytes_transferred = 0;
- return true;
- }
+ receive_from_op_base* o(static_cast<receive_from_op_base*>(base));
- // Copy buffers into array.
- socket_ops::buf bufs[max_buffers];
- typename MutableBufferSequence::const_iterator iter = buffers_.begin();
- typename MutableBufferSequence::const_iterator end = buffers_.end();
- size_t i = 0;
- for (; iter != end && i < max_buffers; ++iter, ++i)
- {
- boost::asio::mutable_buffer buffer(*iter);
- socket_ops::init_buf(bufs[i],
- boost::asio::buffer_cast<void*>(buffer),
- boost::asio::buffer_size(buffer));
- }
+ buffer_sequence_adapter<boost::asio::mutable_buffer,
+ MutableBufferSequence> bufs(o->buffers_);
for (;;)
{
// Receive some data.
- std::size_t addr_len = sender_endpoint_.capacity();
- int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
- sender_endpoint_.data(), &addr_len, ec);
- if (bytes == 0 && protocol_type_ == SOCK_STREAM)
+ boost::system::error_code ec;
+ std::size_t addr_len = o->sender_endpoint_.capacity();
+ int bytes = socket_ops::recvfrom(o->socket_, bufs.buffers(),
+ bufs.count(), o->flags_, o->sender_endpoint_.data(), &addr_len, ec);
+ if (bytes == 0 && o->protocol_type_ == SOCK_STREAM)
ec = boost::asio::error::eof;
// Retry operation if interrupted by signal.
@@ -1379,64 +1208,84 @@ public:
|| ec == boost::asio::error::try_again)
return false;
- sender_endpoint_.resize(addr_len);
- bytes_transferred = (bytes < 0 ? 0 : bytes);
+ o->sender_endpoint_.resize(addr_len);
+ o->ec_ = ec;
+ o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
return true;
}
}
- void complete(const boost::system::error_code& ec,
- std::size_t bytes_transferred)
- {
- io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
- }
-
private:
socket_type socket_;
int protocol_type_;
- boost::asio::io_service& io_service_;
- boost::asio::io_service::work work_;
MutableBufferSequence buffers_;
endpoint_type& sender_endpoint_;
socket_base::message_flags flags_;
};
- // Start an asynchronous receive. The buffer for the data being received and
- // the sender_endpoint object must both be valid for the lifetime of the
- // asynchronous operation.
template <typename MutableBufferSequence, typename Handler>
- void async_receive_from(implementation_type& impl,
- const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
- socket_base::message_flags flags, Handler handler)
+ class receive_from_op : public receive_from_op_base<MutableBufferSequence>
{
- if (!is_open(impl))
+ public:
+ receive_from_op(socket_type socket, int protocol_type,
+ const MutableBufferSequence& buffers, endpoint_type& endpoint,
+ socket_base::message_flags flags, Handler handler)
+ : receive_from_op_base<MutableBufferSequence>(socket, protocol_type,
+ buffers, endpoint, flags, &receive_from_op::do_complete),
+ handler_(handler)
{
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
}
- else
+
+ static void do_complete(io_service_impl* owner, operation* base,
+ boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/)
{
- // Make socket non-blocking.
- if (!(impl.flags_ & implementation_type::internal_non_blocking))
+ // Take ownership of the handler object.
+ receive_from_op* o(static_cast<receive_from_op*>(base));
+ typedef handler_alloc_traits<Handler, receive_from_op> alloc_traits;
+ handler_ptr<alloc_traits> ptr(o->handler_, o);
+
+ // Make the upcall if required.
+ if (owner)
{
- if (!(impl.flags_ & implementation_type::non_blocking))
- {
- ioctl_arg_type non_blocking = 1;
- boost::system::error_code ec;
- if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
- {
- this->get_io_service().post(bind_handler(handler, ec, 0));
- return;
- }
- }
- impl.flags_ |= implementation_type::internal_non_blocking;
+ // Make a copy of the handler so that the memory can be deallocated
+ // before the upcall is made. Even if we're not about to make an
+ // upcall, a sub-object of the handler may be the true owner of the
+ // memory associated with the handler. Consequently, a local copy of
+ // the handler is required to ensure that any owning sub-object remains
+ // valid until after we have deallocated the memory here.
+ detail::binder2<Handler, boost::system::error_code, std::size_t>
+ handler(o->handler_, o->ec_, o->bytes_transferred_);
+ ptr.reset();
+ boost::asio::detail::fenced_block b;
+ boost_asio_handler_invoke_helpers::invoke(handler, handler);
}
-
- reactor_.start_read_op(impl.socket_, impl.reactor_data_,
- receive_from_operation<MutableBufferSequence, Handler>(
- impl.socket_, impl.protocol_.type(), this->get_io_service(),
- buffers, sender_endpoint, flags, handler));
}
+
+ private:
+ Handler handler_;
+ };
+
+ // Start an asynchronous receive. The buffer for the data being received and
+ // the sender_endpoint object must both be valid for the lifetime of the
+ // asynchronous operation.
+ template <typename MutableBufferSequence, typename Handler>
+ void async_receive_from(implementation_type& impl,
+ const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
+ socket_base::message_flags flags, Handler handler)
+ {
+ // Allocate and construct an operation to wrap the handler.
+ typedef receive_from_op<MutableBufferSequence, Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ int protocol_type = impl.protocol_.type();
+ handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_,
+ protocol_type, buffers, sender_endpoint, flags, handler);
+
+ start_op(impl,
+ (flags & socket_base::message_out_of_band)
+ ? reactor::except_op : reactor::read_op,
+ ptr.get(), true, false);
+ ptr.release();
}
// Wait until data can be received without blocking.
@@ -1445,28 +1294,20 @@ public:
const null_buffers&, endpoint_type& sender_endpoint,
socket_base::message_flags flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- }
- else
- {
- // Reset endpoint since it can be given no sensible value at this time.
- sender_endpoint = endpoint_type();
+ // Allocate and construct an operation to wrap the handler.
+ typedef null_buffers_op<Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, handler);
- if (flags & socket_base::message_out_of_band)
- {
- reactor_.start_except_op(impl.socket_, impl.reactor_data_,
- null_buffers_operation<Handler>(this->get_io_service(), handler));
- }
- else
- {
- reactor_.start_read_op(impl.socket_, impl.reactor_data_,
- null_buffers_operation<Handler>(this->get_io_service(), handler),
- false);
- }
- }
+ // Reset endpoint since it can be given no sensible value at this time.
+ sender_endpoint = endpoint_type();
+
+ start_op(impl,
+ (flags & socket_base::message_out_of_band)
+ ? reactor::except_op : reactor::read_op,
+ ptr.get(), false, false);
+ ptr.release();
}
// Accept a new connection.
@@ -1546,19 +1387,15 @@ public:
}
}
- template <typename Socket, typename Handler>
- class accept_operation :
- public handler_base_from_member<Handler>
+ template <typename Socket>
+ class accept_op_base : public reactor_op
{
public:
- accept_operation(socket_type socket, boost::asio::io_service& io_service,
- Socket& peer, const protocol_type& protocol,
- endpoint_type* peer_endpoint, bool enable_connection_aborted,
- Handler handler)
- : handler_base_from_member<Handler>(handler),
+ accept_op_base(socket_type socket, Socket& peer,
+ const protocol_type& protocol, endpoint_type* peer_endpoint,
+ bool enable_connection_aborted, func_type complete_func)
+ : reactor_op(&accept_op_base::do_perform, complete_func),
socket_(socket),
- io_service_(io_service),
- work_(io_service),
peer_(peer),
protocol_(protocol),
peer_endpoint_(peer_endpoint),
@@ -1566,27 +1403,25 @@ public:
{
}
- bool perform(boost::system::error_code& ec, std::size_t&)
+ static bool do_perform(reactor_op* base)
{
- // Check whether the operation was successful.
- if (ec)
- return true;
+ accept_op_base* o(static_cast<accept_op_base*>(base));
for (;;)
{
// Accept the waiting connection.
+ boost::system::error_code ec;
socket_holder new_socket;
std::size_t addr_len = 0;
- if (peer_endpoint_)
- {
- addr_len = peer_endpoint_->capacity();
- new_socket.reset(socket_ops::accept(socket_,
- peer_endpoint_->data(), &addr_len, ec));
- }
- else
+ std::size_t* addr_len_p = 0;
+ socket_addr_type* addr = 0;
+ if (o->peer_endpoint_)
{
- new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
+ addr_len = o->peer_endpoint_->capacity();
+ addr_len_p = &addr_len;
+ addr = o->peer_endpoint_->data();
}
+ new_socket.reset(socket_ops::accept(o->socket_, addr, addr_len_p, ec));
// Retry operation if interrupted by signal.
if (ec == boost::asio::error::interrupted)
@@ -1597,83 +1432,95 @@ public:
|| ec == boost::asio::error::try_again)
return false;
if (ec == boost::asio::error::connection_aborted
- && !enable_connection_aborted_)
+ && !o->enable_connection_aborted_)
return false;
#if defined(EPROTO)
- if (ec.value() == EPROTO && !enable_connection_aborted_)
+ if (ec.value() == EPROTO && !o->enable_connection_aborted_)
return false;
#endif // defined(EPROTO)
// Transfer ownership of the new socket to the peer object.
if (!ec)
{
- if (peer_endpoint_)
- peer_endpoint_->resize(addr_len);
- peer_.assign(protocol_, new_socket.get(), ec);
+ if (o->peer_endpoint_)
+ o->peer_endpoint_->resize(addr_len);
+ o->peer_.assign(o->protocol_, new_socket.get(), ec);
if (!ec)
new_socket.release();
}
+ o->ec_ = ec;
return true;
}
}
- void complete(const boost::system::error_code& ec, std::size_t)
- {
- io_service_.post(bind_handler(this->handler_, ec));
- }
-
private:
socket_type socket_;
- boost::asio::io_service& io_service_;
- boost::asio::io_service::work work_;
Socket& peer_;
protocol_type protocol_;
endpoint_type* peer_endpoint_;
bool enable_connection_aborted_;
};
- // Start an asynchronous accept. The peer and peer_endpoint objects
- // must be valid until the accept's handler is invoked.
template <typename Socket, typename Handler>
- void async_accept(implementation_type& impl, Socket& peer,
- endpoint_type* peer_endpoint, Handler handler)
+ class accept_op : public accept_op_base<Socket>
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor));
- }
- else if (peer.is_open())
+ public:
+ accept_op(socket_type socket, Socket& peer, const protocol_type& protocol,
+ endpoint_type* peer_endpoint, bool enable_connection_aborted,
+ Handler handler)
+ : accept_op_base<Socket>(socket, peer, protocol, peer_endpoint,
+ enable_connection_aborted, &accept_op::do_complete),
+ handler_(handler)
{
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::already_open));
}
- else
+
+ static void do_complete(io_service_impl* owner, operation* base,
+ boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/)
{
- // Make socket non-blocking.
- if (!(impl.flags_ & implementation_type::internal_non_blocking))
+ // Take ownership of the handler object.
+ accept_op* o(static_cast<accept_op*>(base));
+ typedef handler_alloc_traits<Handler, accept_op> alloc_traits;
+ handler_ptr<alloc_traits> ptr(o->handler_, o);
+
+ // Make the upcall if required.
+ if (owner)
{
- if (!(impl.flags_ & implementation_type::non_blocking))
- {
- ioctl_arg_type non_blocking = 1;
- boost::system::error_code ec;
- if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
- {
- this->get_io_service().post(bind_handler(handler, ec));
- return;
- }
- }
- impl.flags_ |= implementation_type::internal_non_blocking;
+ // Make a copy of the handler so that the memory can be deallocated
+ // before the upcall is made. Even if we're not about to make an
+ // upcall, a sub-object of the handler may be the true owner of the
+ // memory associated with the handler. Consequently, a local copy of
+ // the handler is required to ensure that any owning sub-object remains
+ // valid until after we have deallocated the memory here.
+ detail::binder1<Handler, boost::system::error_code>
+ handler(o->handler_, o->ec_);
+ ptr.reset();
+ boost::asio::detail::fenced_block b;
+ boost_asio_handler_invoke_helpers::invoke(handler, handler);
}
-
- reactor_.start_read_op(impl.socket_, impl.reactor_data_,
- accept_operation<Socket, Handler>(
- impl.socket_, this->get_io_service(),
- peer, impl.protocol_, peer_endpoint,
- (impl.flags_ & implementation_type::enable_connection_aborted) != 0,
- handler));
}
+
+ private:
+ Handler handler_;
+ };
+
+ // Start an asynchronous accept. The peer and peer_endpoint objects
+ // must be valid until the accept's handler is invoked.
+ template <typename Socket, typename Handler>
+ void async_accept(implementation_type& impl, Socket& peer,
+ endpoint_type* peer_endpoint, Handler handler)
+ {
+ // Allocate and construct an operation to wrap the handler.
+ typedef accept_op<Socket, Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ bool enable_connection_aborted =
+ (impl.flags_ & implementation_type::enable_connection_aborted) != 0;
+ handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, peer,
+ impl.protocol_, peer_endpoint, enable_connection_aborted, handler);
+
+ start_accept_op(impl, ptr.get(), peer.is_open());
+ ptr.release();
}
// Connect the socket to the specified endpoint.
@@ -1713,53 +1560,77 @@ public:
return ec;
}
- template <typename Handler>
- class connect_operation :
- public handler_base_from_member<Handler>
+ class connect_op_base : public reactor_op
{
public:
- connect_operation(socket_type socket,
- boost::asio::io_service& io_service, Handler handler)
- : handler_base_from_member<Handler>(handler),
- socket_(socket),
- io_service_(io_service),
- work_(io_service)
+ connect_op_base(socket_type socket, func_type complete_func)
+ : reactor_op(&connect_op_base::do_perform, complete_func),
+ socket_(socket)
{
}
- bool perform(boost::system::error_code& ec, std::size_t&)
+ static bool do_perform(reactor_op* base)
{
- // Check whether the operation was successful.
- if (ec)
- return true;
+ connect_op_base* o(static_cast<connect_op_base*>(base));
// Get the error code from the connect operation.
int connect_error = 0;
size_t connect_error_len = sizeof(connect_error);
- if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
- &connect_error, &connect_error_len, ec) == socket_error_retval)
+ if (socket_ops::getsockopt(o->socket_, SOL_SOCKET, SO_ERROR,
+ &connect_error, &connect_error_len, o->ec_) == socket_error_retval)
return true;
// The connection failed so the handler will be posted with an error code.
if (connect_error)
{
- ec = boost::system::error_code(connect_error,
+ o->ec_ = boost::system::error_code(connect_error,
boost::asio::error::get_system_category());
- return true;
}
return true;
}
- void complete(const boost::system::error_code& ec, std::size_t)
+ private:
+ socket_type socket_;
+ };
+
+ template <typename Handler>
+ class connect_op : public connect_op_base
+ {
+ public:
+ connect_op(socket_type socket, Handler handler)
+ : connect_op_base(socket, &connect_op::do_complete),
+ handler_(handler)
{
- io_service_.post(bind_handler(this->handler_, ec));
+ }
+
+ static void do_complete(io_service_impl* owner, operation* base,
+ boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/)
+ {
+ // Take ownership of the handler object.
+ connect_op* o(static_cast<connect_op*>(base));
+ typedef handler_alloc_traits<Handler, connect_op> alloc_traits;
+ handler_ptr<alloc_traits> ptr(o->handler_, o);
+
+ // Make the upcall if required.
+ if (owner)
+ {
+ // Make a copy of the handler so that the memory can be deallocated
+ // before the upcall is made. Even if we're not about to make an
+ // upcall, a sub-object of the handler may be the true owner of the
+ // memory associated with the handler. Consequently, a local copy of
+ // the handler is required to ensure that any owning sub-object remains
+ // valid until after we have deallocated the memory here.
+ detail::binder1<Handler, boost::system::error_code>
+ handler(o->handler_, o->ec_);
+ ptr.reset();
+ boost::asio::detail::fenced_block b;
+ boost_asio_handler_invoke_helpers::invoke(handler, handler);
+ }
}
private:
- socket_type socket_;
- boost::asio::io_service& io_service_;
- boost::asio::io_service::work work_;
+ Handler handler_;
};
// Start an asynchronous connect.
@@ -1767,59 +1638,103 @@ public:
void async_connect(implementation_type& impl,
const endpoint_type& peer_endpoint, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor));
- return;
- }
+ // Allocate and construct an operation to wrap the handler.
+ typedef connect_op<Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, handler);
+
+ start_connect_op(impl, ptr.get(), peer_endpoint);
+ ptr.release();
+ }
- // Make socket non-blocking.
- if (!(impl.flags_ & implementation_type::internal_non_blocking))
+private:
+ // Start the asynchronous read or write operation.
+ void start_op(implementation_type& impl, int op_type,
+ reactor_op* op, bool non_blocking, bool noop)
+ {
+ if (!noop)
{
- if (!(impl.flags_ & implementation_type::non_blocking))
+ if (is_open(impl))
{
- ioctl_arg_type non_blocking = 1;
- boost::system::error_code ec;
- if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
+ if (!non_blocking || is_non_blocking(impl)
+ || set_non_blocking(impl, op->ec_))
{
- this->get_io_service().post(bind_handler(handler, ec));
+ reactor_.start_op(op_type, impl.socket_,
+ impl.reactor_data_, op, non_blocking);
return;
}
}
- impl.flags_ |= implementation_type::internal_non_blocking;
+ else
+ op->ec_ = boost::asio::error::bad_descriptor;
}
- // Start the connect operation. The socket is already marked as non-blocking
- // so the connection will take place asynchronously.
- boost::system::error_code ec;
- if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
- peer_endpoint.size(), ec) == 0)
+ io_service_impl_.post_immediate_completion(op);
+ }
+
+ // Start the asynchronous accept operation.
+ void start_accept_op(implementation_type& impl,
+ reactor_op* op, bool peer_is_open)
+ {
+ if (!peer_is_open)
+ start_op(impl, reactor::read_op, op, true, false);
+ else
{
- // The connect operation has finished successfully so we need to post the
- // handler immediately.
- this->get_io_service().post(bind_handler(handler,
- boost::system::error_code()));
+ op->ec_ = boost::asio::error::already_open;
+ io_service_impl_.post_immediate_completion(op);
}
- else if (ec == boost::asio::error::in_progress
- || ec == boost::asio::error::would_block)
+ }
+
+ // Start the asynchronous connect operation.
+ void start_connect_op(implementation_type& impl,
+ reactor_op* op, const endpoint_type& peer_endpoint)
+ {
+ if (is_open(impl))
{
- // The connection is happening in the background, and we need to wait
- // until the socket becomes writeable.
- reactor_.start_connect_op(impl.socket_, impl.reactor_data_,
- connect_operation<Handler>(impl.socket_,
- this->get_io_service(), handler));
+ if (is_non_blocking(impl) || set_non_blocking(impl, op->ec_))
+ {
+ if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
+ peer_endpoint.size(), op->ec_) != 0)
+ {
+ if (op->ec_ == boost::asio::error::in_progress
+ || op->ec_ == boost::asio::error::would_block)
+ {
+ op->ec_ = boost::system::error_code();
+ reactor_.start_op(reactor::connect_op,
+ impl.socket_, impl.reactor_data_, op, false);
+ return;
+ }
+ }
+ }
}
else
- {
- // The connect operation has failed, so post the handler immediately.
- this->get_io_service().post(bind_handler(handler, ec));
- }
+ op->ec_ = boost::asio::error::bad_descriptor;
+
+ io_service_impl_.post_immediate_completion(op);
}
-private:
+ // Determine whether the socket has been set non-blocking.
+ bool is_non_blocking(implementation_type& impl) const
+ {
+ return (impl.flags_ & implementation_type::non_blocking);
+ }
+
+ // Set the internal non-blocking flag.
+ bool set_non_blocking(implementation_type& impl,
+ boost::system::error_code& ec)
+ {
+ ioctl_arg_type non_blocking = 1;
+ if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
+ return false;
+ impl.flags_ |= implementation_type::internal_non_blocking;
+ return true;
+ }
+
+ // The io_service implementation used to post completions.
+ io_service_impl& io_service_impl_;
+
// The selector that performs event demultiplexing for the service.
- Reactor& reactor_;
+ reactor& reactor_;
};
} // namespace detail