diff options
author | Remko Tronçon <git@el-tramo.be> | 2010-05-06 17:44:27 (GMT) |
---|---|---|
committer | Remko Tronçon <git@el-tramo.be> | 2010-05-06 17:44:27 (GMT) |
commit | d76ada0ab59634e3333f9eb5a92d0e850f60d7bf (patch) | |
tree | 5eaae441173fad2ec19ba67d6589f28ecd740991 /3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp | |
parent | 6f49e5abee37d37b351d68c01374232eccdac458 (diff) | |
download | swift-contrib-d76ada0ab59634e3333f9eb5a92d0e850f60d7bf.zip swift-contrib-d76ada0ab59634e3333f9eb5a92d0e850f60d7bf.tar.bz2 |
Updated Boost to 1.43.0.
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp')
-rw-r--r-- | 3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp | 1195 |
1 files changed, 555 insertions, 640 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp b/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp index 95d39dd..5f7bbf5 100644 --- a/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp +++ b/3rdParty/Boost/src/boost/asio/detail/reactive_socket_service.hpp @@ -17,18 +17,17 @@ #include <boost/asio/detail/push_options.hpp> -#include <boost/asio/detail/push_options.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/asio/detail/pop_options.hpp> - #include <boost/asio/buffer.hpp> #include <boost/asio/error.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/socket_base.hpp> #include <boost/asio/detail/bind_handler.hpp> -#include <boost/asio/detail/handler_base_from_member.hpp> +#include <boost/asio/detail/buffer_sequence_adapter.hpp> +#include <boost/asio/detail/fenced_block.hpp> #include <boost/asio/detail/noncopyable.hpp> -#include <boost/asio/detail/service_base.hpp> +#include <boost/asio/detail/null_buffers_op.hpp> +#include <boost/asio/detail/reactor.hpp> +#include <boost/asio/detail/reactor_op.hpp> #include <boost/asio/detail/socket_holder.hpp> #include <boost/asio/detail/socket_ops.hpp> #include <boost/asio/detail/socket_types.hpp> @@ -37,10 +36,8 @@ namespace boost { namespace asio { namespace detail { -template <typename Protocol, typename Reactor> +template <typename Protocol> class reactive_socket_service - : public boost::asio::detail::service_base< - reactive_socket_service<Protocol, Reactor> > { public: // The protocol type. @@ -67,7 +64,7 @@ public: private: // Only this service will have access to the internal values. - friend class reactive_socket_service<Protocol, Reactor>; + friend class reactive_socket_service<Protocol>; // The native socket representation. socket_type socket_; @@ -87,7 +84,7 @@ public: // User wants connection_aborted errors, which are disabled by default. enable_connection_aborted = 4, - // The user set the linger option. Needs to be checked when closing. + // The user set the linger option. Needs to be checked when closing. user_set_linger = 8 }; @@ -98,17 +95,13 @@ public: protocol_type protocol_; // Per-descriptor data used by the reactor. - typename Reactor::per_descriptor_data reactor_data_; + reactor::per_descriptor_data reactor_data_; }; - // The maximum number of buffers to support in a single operation. - enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len }; - // Constructor. reactive_socket_service(boost::asio::io_service& io_service) - : boost::asio::detail::service_base< - reactive_socket_service<Protocol, Reactor> >(io_service), - reactor_(boost::asio::use_service<Reactor>(io_service)) + : io_service_impl_(use_service<io_service_impl>(io_service)), + reactor_(use_service<reactor>(io_service)) { reactor_.init_task(); } @@ -452,43 +445,30 @@ public: return ec; } - if (command.name() == static_cast<int>(FIONBIO)) + socket_ops::ioctl(impl.socket_, command.name(), + static_cast<ioctl_arg_type*>(command.data()), ec); + + // When updating the non-blocking mode we always perform the ioctl + // syscall, even if the flags would otherwise indicate that the socket is + // already in the correct state. This ensures that the underlying socket + // is put into the state that has been requested by the user. If the ioctl + // syscall was successful then we need to update the flags to match. + if (!ec && command.name() == static_cast<int>(FIONBIO)) { - // Flags are manipulated in a temporary variable so that the socket - // implementation is not updated unless the ioctl operation succeeds. - unsigned char new_flags = impl.flags_; if (*static_cast<ioctl_arg_type*>(command.data())) - new_flags |= implementation_type::user_set_non_blocking; - else - new_flags &= ~implementation_type::user_set_non_blocking; - - // Perform ioctl on socket if the non-blocking state has changed. - if (!(impl.flags_ & implementation_type::non_blocking) - && (new_flags & implementation_type::non_blocking)) { - ioctl_arg_type non_blocking = 1; - socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec); - } - else if ((impl.flags_ & implementation_type::non_blocking) - && !(new_flags & implementation_type::non_blocking)) - { - ioctl_arg_type non_blocking = 0; - socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec); + impl.flags_ |= implementation_type::user_set_non_blocking; } else { - ec = boost::system::error_code(); + // Clearing the non-blocking mode always overrides any internally-set + // non-blocking flag. Any subsequent asynchronous operations will need + // to re-enable non-blocking I/O. + impl.flags_ &= ~(implementation_type::user_set_non_blocking + | implementation_type::internal_non_blocking); } - - // Update socket implementation's flags only if successful. - if (!ec) - impl.flags_ = new_flags; - } - else - { - socket_ops::ioctl(impl.socket_, command.name(), - static_cast<ioctl_arg_type*>(command.data()), ec); } + return ec; } @@ -553,23 +533,11 @@ public: return 0; } - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<const void*>(buffer), - boost::asio::buffer_size(buffer)); - total_buffer_size += boost::asio::buffer_size(buffer); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); // A request to receive 0 bytes on a stream socket is a no-op. - if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) + if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty()) { ec = boost::system::error_code(); return 0; @@ -579,7 +547,8 @@ public: for (;;) { // Try to complete the operation without blocking. - int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec); + int bytes_sent = socket_ops::send(impl.socket_, + bufs.buffers(), bufs.count(), flags, ec); // Check if operation succeeded. if (bytes_sent >= 0) @@ -613,50 +582,32 @@ public: return 0; } - template <typename ConstBufferSequence, typename Handler> - class send_operation : - public handler_base_from_member<Handler> + template <typename ConstBufferSequence> + class send_op_base : public reactor_op { public: - send_operation(socket_type socket, boost::asio::io_service& io_service, - const ConstBufferSequence& buffers, socket_base::message_flags flags, - Handler handler) - : handler_base_from_member<Handler>(handler), + send_op_base(socket_type socket, const ConstBufferSequence& buffers, + socket_base::message_flags flags, func_type complete_func) + : reactor_op(&send_op_base::do_perform, complete_func), socket_(socket), - io_service_(io_service), - work_(io_service), buffers_(buffers), flags_(flags) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - { - bytes_transferred = 0; - return true; - } + send_op_base* o(static_cast<send_op_base*>(base)); - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers_.begin(); - typename ConstBufferSequence::const_iterator end = buffers_.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<const void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(o->buffers_); for (;;) { // Send the data. - int bytes = socket_ops::send(socket_, bufs, i, flags_, ec); + boost::system::error_code ec; + int bytes = socket_ops::send(o->socket_, + bufs.buffers(), bufs.count(), o->flags_, ec); // Retry operation if interrupted by signal. if (ec == boost::asio::error::interrupted) @@ -667,127 +618,92 @@ public: || ec == boost::asio::error::try_again) return false; - bytes_transferred = (bytes < 0 ? 0 : bytes); + o->ec_ = ec; + o->bytes_transferred_ = (bytes < 0 ? 0 : bytes); return true; } } - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); - } - private: socket_type socket_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; ConstBufferSequence buffers_; socket_base::message_flags flags_; }; - // Start an asynchronous send. The data being sent must be valid for the - // lifetime of the asynchronous operation. template <typename ConstBufferSequence, typename Handler> - void async_send(implementation_type& impl, const ConstBufferSequence& buffers, - socket_base::message_flags flags, Handler handler) - { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - if (impl.protocol_.type() == SOCK_STREAM) - { - // Determine total size of buffers. - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - total_buffer_size += boost::asio::buffer_size(buffer); - } - - // A request to receive 0 bytes on a stream socket is a no-op. - if (total_buffer_size == 0) - { - this->get_io_service().post(bind_handler(handler, - boost::system::error_code(), 0)); - return; - } - } - - // Make socket non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) - { - if (!(impl.flags_ & implementation_type::non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec, 0)); - return; - } - } - impl.flags_ |= implementation_type::internal_non_blocking; - } - - reactor_.start_write_op(impl.socket_, impl.reactor_data_, - send_operation<ConstBufferSequence, Handler>( - impl.socket_, this->get_io_service(), buffers, flags, handler)); - } - } - - template <typename Handler> - class null_buffers_operation : - public handler_base_from_member<Handler> + class send_op : public send_op_base<ConstBufferSequence> { public: - null_buffers_operation(boost::asio::io_service& io_service, Handler handler) - : handler_base_from_member<Handler>(handler), - work_(io_service) + send_op(socket_type socket, const ConstBufferSequence& buffers, + socket_base::message_flags flags, Handler handler) + : send_op_base<ConstBufferSequence>(socket, + buffers, flags, &send_op::do_complete), + handler_(handler) { } - bool perform(boost::system::error_code&, - std::size_t& bytes_transferred) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) { - bytes_transferred = 0; - return true; - } + // Take ownership of the handler object. + send_op* o(static_cast<send_op*>(base)); + typedef handler_alloc_traits<Handler, send_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - work_.get_io_service().post(bind_handler( - this->handler_, ec, bytes_transferred)); + // Make the upcall if required. + if (owner) + { + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, o->ec_, o->bytes_transferred_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } private: - boost::asio::io_service::work work_; + Handler handler_; }; + // Start an asynchronous send. The data being sent must be valid for the + // lifetime of the asynchronous operation. + template <typename ConstBufferSequence, typename Handler> + void async_send(implementation_type& impl, const ConstBufferSequence& buffers, + socket_base::message_flags flags, Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef send_op<ConstBufferSequence, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, + impl.socket_, buffers, flags, handler); + + start_op(impl, reactor::write_op, ptr.get(), true, + (impl.protocol_.type() == SOCK_STREAM + && buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence>::all_empty(buffers))); + ptr.release(); + } + // Start an asynchronous wait until data can be sent without blocking. template <typename Handler> void async_send(implementation_type& impl, const null_buffers&, socket_base::message_flags, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - reactor_.start_write_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); + + start_op(impl, reactor::write_op, ptr.get(), false, false); + ptr.release(); } // Send a datagram to the specified endpoint. Returns the number of bytes @@ -803,25 +719,15 @@ public: return 0; } - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<const void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); // Send the data. for (;;) { // Try to complete the operation without blocking. - int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags, - destination.data(), destination.size(), ec); + int bytes_sent = socket_ops::sendto(impl.socket_, bufs.buffers(), + bufs.count(), flags, destination.data(), destination.size(), ec); // Check if operation succeeded. if (bytes_sent >= 0) @@ -856,52 +762,34 @@ public: return 0; } - template <typename ConstBufferSequence, typename Handler> - class send_to_operation : - public handler_base_from_member<Handler> + template <typename ConstBufferSequence> + class send_to_op_base : public reactor_op { public: - send_to_operation(socket_type socket, boost::asio::io_service& io_service, - const ConstBufferSequence& buffers, const endpoint_type& endpoint, - socket_base::message_flags flags, Handler handler) - : handler_base_from_member<Handler>(handler), + send_to_op_base(socket_type socket, const ConstBufferSequence& buffers, + const endpoint_type& endpoint, socket_base::message_flags flags, + func_type complete_func) + : reactor_op(&send_to_op_base::do_perform, complete_func), socket_(socket), - io_service_(io_service), - work_(io_service), buffers_(buffers), destination_(endpoint), flags_(flags) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - { - bytes_transferred = 0; - return true; - } + send_to_op_base* o(static_cast<send_to_op_base*>(base)); - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers_.begin(); - typename ConstBufferSequence::const_iterator end = buffers_.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<const void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(o->buffers_); for (;;) { // Send the data. - int bytes = socket_ops::sendto(socket_, bufs, i, flags_, - destination_.data(), destination_.size(), ec); + boost::system::error_code ec; + int bytes = socket_ops::sendto(o->socket_, bufs.buffers(), bufs.count(), + o->flags_, o->destination_.data(), o->destination_.size(), ec); // Retry operation if interrupted by signal. if (ec == boost::asio::error::interrupted) @@ -912,62 +800,78 @@ public: || ec == boost::asio::error::try_again) return false; - bytes_transferred = (bytes < 0 ? 0 : bytes); + o->ec_ = ec; + o->bytes_transferred_ = (bytes < 0 ? 0 : bytes); return true; } } - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); - } - private: socket_type socket_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; ConstBufferSequence buffers_; endpoint_type destination_; socket_base::message_flags flags_; }; - // Start an asynchronous send. The data being sent must be valid for the - // lifetime of the asynchronous operation. template <typename ConstBufferSequence, typename Handler> - void async_send_to(implementation_type& impl, - const ConstBufferSequence& buffers, - const endpoint_type& destination, socket_base::message_flags flags, - Handler handler) + class send_to_op : public send_to_op_base<ConstBufferSequence> { - if (!is_open(impl)) + public: + send_to_op(socket_type socket, const ConstBufferSequence& buffers, + const endpoint_type& endpoint, socket_base::message_flags flags, + Handler handler) + : send_to_op_base<ConstBufferSequence>(socket, + buffers, endpoint, flags, &send_to_op::do_complete), + handler_(handler) { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); } - else + + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) { - // Make socket non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) + // Take ownership of the handler object. + send_to_op* o(static_cast<send_to_op*>(base)); + typedef handler_alloc_traits<Handler, send_to_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); + + // Make the upcall if required. + if (owner) { - if (!(impl.flags_ & implementation_type::non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec, 0)); - return; - } - } - impl.flags_ |= implementation_type::internal_non_blocking; + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, o->ec_, o->bytes_transferred_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } - - reactor_.start_write_op(impl.socket_, impl.reactor_data_, - send_to_operation<ConstBufferSequence, Handler>( - impl.socket_, this->get_io_service(), buffers, - destination, flags, handler)); } + + private: + Handler handler_; + }; + + // Start an asynchronous send. The data being sent must be valid for the + // lifetime of the asynchronous operation. + template <typename ConstBufferSequence, typename Handler> + void async_send_to(implementation_type& impl, + const ConstBufferSequence& buffers, + const endpoint_type& destination, socket_base::message_flags flags, + Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef send_to_op<ConstBufferSequence, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, + buffers, destination, flags, handler); + + start_op(impl, reactor::write_op, ptr.get(), true, false); + ptr.release(); } // Start an asynchronous wait until data can be sent without blocking. @@ -975,17 +879,14 @@ public: void async_send_to(implementation_type& impl, const null_buffers&, socket_base::message_flags, const endpoint_type&, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - reactor_.start_write_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); + + start_op(impl, reactor::write_op, ptr.get(), false, false); + ptr.release(); } // Receive some data from the peer. Returns the number of bytes received. @@ -1000,23 +901,11 @@ public: return 0; } - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<void*>(buffer), - boost::asio::buffer_size(buffer)); - total_buffer_size += boost::asio::buffer_size(buffer); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); // A request to receive 0 bytes on a stream socket is a no-op. - if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) + if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty()) { ec = boost::system::error_code(); return 0; @@ -1026,7 +915,8 @@ public: for (;;) { // Try to complete the operation without blocking. - int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec); + int bytes_recvd = socket_ops::recv(impl.socket_, + bufs.buffers(), bufs.count(), flags, ec); // Check if operation succeeded. if (bytes_recvd > 0) @@ -1067,53 +957,35 @@ public: return 0; } - template <typename MutableBufferSequence, typename Handler> - class receive_operation : - public handler_base_from_member<Handler> + template <typename MutableBufferSequence> + class receive_op_base : public reactor_op { public: - receive_operation(socket_type socket, int protocol_type, - boost::asio::io_service& io_service, + receive_op_base(socket_type socket, int protocol_type, const MutableBufferSequence& buffers, - socket_base::message_flags flags, Handler handler) - : handler_base_from_member<Handler>(handler), + socket_base::message_flags flags, func_type complete_func) + : reactor_op(&receive_op_base::do_perform, complete_func), socket_(socket), protocol_type_(protocol_type), - io_service_(io_service), - work_(io_service), buffers_(buffers), flags_(flags) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - { - bytes_transferred = 0; - return true; - } + receive_op_base* o(static_cast<receive_op_base*>(base)); - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers_.begin(); - typename MutableBufferSequence::const_iterator end = buffers_.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(o->buffers_); for (;;) { // Receive some data. - int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec); - if (bytes == 0 && protocol_type_ == SOCK_STREAM) + boost::system::error_code ec; + int bytes = socket_ops::recv(o->socket_, + bufs.buffers(), bufs.count(), o->flags_, ec); + if (bytes == 0 && o->protocol_type_ == SOCK_STREAM) ec = boost::asio::error::eof; // Retry operation if interrupted by signal. @@ -1125,93 +997,84 @@ public: || ec == boost::asio::error::try_again) return false; - bytes_transferred = (bytes < 0 ? 0 : bytes); + o->ec_ = ec; + o->bytes_transferred_ = (bytes < 0 ? 0 : bytes); return true; } } - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); - } - private: socket_type socket_; int protocol_type_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; MutableBufferSequence buffers_; socket_base::message_flags flags_; }; - // Start an asynchronous receive. The buffer for the data being received - // must be valid for the lifetime of the asynchronous operation. template <typename MutableBufferSequence, typename Handler> - void async_receive(implementation_type& impl, - const MutableBufferSequence& buffers, - socket_base::message_flags flags, Handler handler) + class receive_op : public receive_op_base<MutableBufferSequence> { - if (!is_open(impl)) + public: + receive_op(socket_type socket, int protocol_type, + const MutableBufferSequence& buffers, + socket_base::message_flags flags, Handler handler) + : receive_op_base<MutableBufferSequence>(socket, + protocol_type, buffers, flags, &receive_op::do_complete), + handler_(handler) { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); } - else - { - if (impl.protocol_.type() == SOCK_STREAM) - { - // Determine total size of buffers. - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - total_buffer_size += boost::asio::buffer_size(buffer); - } - - // A request to receive 0 bytes on a stream socket is a no-op. - if (total_buffer_size == 0) - { - this->get_io_service().post(bind_handler(handler, - boost::system::error_code(), 0)); - return; - } - } - // Make socket non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) - { - if (!(impl.flags_ & implementation_type::non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec, 0)); - return; - } - } - impl.flags_ |= implementation_type::internal_non_blocking; - } + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) + { + // Take ownership of the handler object. + receive_op* o(static_cast<receive_op*>(base)); + typedef handler_alloc_traits<Handler, receive_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); - if (flags & socket_base::message_out_of_band) + // Make the upcall if required. + if (owner) { - reactor_.start_except_op(impl.socket_, impl.reactor_data_, - receive_operation<MutableBufferSequence, Handler>( - impl.socket_, impl.protocol_.type(), - this->get_io_service(), buffers, flags, handler)); - } - else - { - reactor_.start_read_op(impl.socket_, impl.reactor_data_, - receive_operation<MutableBufferSequence, Handler>( - impl.socket_, impl.protocol_.type(), - this->get_io_service(), buffers, flags, handler)); + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, o->ec_, o->bytes_transferred_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } } + + private: + Handler handler_; + }; + + // Start an asynchronous receive. The buffer for the data being received + // must be valid for the lifetime of the asynchronous operation. + template <typename MutableBufferSequence, typename Handler> + void async_receive(implementation_type& impl, + const MutableBufferSequence& buffers, + socket_base::message_flags flags, Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef receive_op<MutableBufferSequence, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + int protocol_type = impl.protocol_.type(); + handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, + protocol_type, buffers, flags, handler); + + start_op(impl, + (flags & socket_base::message_out_of_band) + ? reactor::except_op : reactor::read_op, + ptr.get(), (flags & socket_base::message_out_of_band) == 0, + (impl.protocol_.type() == SOCK_STREAM + && buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence>::all_empty(buffers))); + ptr.release(); } // Wait until data can be received without blocking. @@ -1219,22 +1082,17 @@ public: void async_receive(implementation_type& impl, const null_buffers&, socket_base::message_flags flags, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else if (flags & socket_base::message_out_of_band) - { - reactor_.start_except_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler)); - } - else - { - reactor_.start_read_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); + + start_op(impl, + (flags & socket_base::message_out_of_band) + ? reactor::except_op : reactor::read_op, + ptr.get(), false, false); + ptr.release(); } // Receive a datagram with the endpoint of the sender. Returns the number of @@ -1251,26 +1109,16 @@ public: return 0; } - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); // Receive some data. for (;;) { // Try to complete the operation without blocking. std::size_t addr_len = sender_endpoint.capacity(); - int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags, - sender_endpoint.data(), &addr_len, ec); + int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs.buffers(), + bufs.count(), flags, sender_endpoint.data(), &addr_len, ec); // Check if operation succeeded. if (bytes_recvd > 0) @@ -1318,56 +1166,37 @@ public: return 0; } - template <typename MutableBufferSequence, typename Handler> - class receive_from_operation : - public handler_base_from_member<Handler> + template <typename MutableBufferSequence> + class receive_from_op_base : public reactor_op { public: - receive_from_operation(socket_type socket, int protocol_type, - boost::asio::io_service& io_service, + receive_from_op_base(socket_type socket, int protocol_type, const MutableBufferSequence& buffers, endpoint_type& endpoint, - socket_base::message_flags flags, Handler handler) - : handler_base_from_member<Handler>(handler), + socket_base::message_flags flags, func_type complete_func) + : reactor_op(&receive_from_op_base::do_perform, complete_func), socket_(socket), protocol_type_(protocol_type), - io_service_(io_service), - work_(io_service), buffers_(buffers), sender_endpoint_(endpoint), flags_(flags) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - { - bytes_transferred = 0; - return true; - } + receive_from_op_base* o(static_cast<receive_from_op_base*>(base)); - // Copy buffers into array. - socket_ops::buf bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers_.begin(); - typename MutableBufferSequence::const_iterator end = buffers_.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - socket_ops::init_buf(bufs[i], - boost::asio::buffer_cast<void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(o->buffers_); for (;;) { // Receive some data. - std::size_t addr_len = sender_endpoint_.capacity(); - int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_, - sender_endpoint_.data(), &addr_len, ec); - if (bytes == 0 && protocol_type_ == SOCK_STREAM) + boost::system::error_code ec; + std::size_t addr_len = o->sender_endpoint_.capacity(); + int bytes = socket_ops::recvfrom(o->socket_, bufs.buffers(), + bufs.count(), o->flags_, o->sender_endpoint_.data(), &addr_len, ec); + if (bytes == 0 && o->protocol_type_ == SOCK_STREAM) ec = boost::asio::error::eof; // Retry operation if interrupted by signal. @@ -1379,64 +1208,84 @@ public: || ec == boost::asio::error::try_again) return false; - sender_endpoint_.resize(addr_len); - bytes_transferred = (bytes < 0 ? 0 : bytes); + o->sender_endpoint_.resize(addr_len); + o->ec_ = ec; + o->bytes_transferred_ = (bytes < 0 ? 0 : bytes); return true; } } - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); - } - private: socket_type socket_; int protocol_type_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; MutableBufferSequence buffers_; endpoint_type& sender_endpoint_; socket_base::message_flags flags_; }; - // Start an asynchronous receive. The buffer for the data being received and - // the sender_endpoint object must both be valid for the lifetime of the - // asynchronous operation. template <typename MutableBufferSequence, typename Handler> - void async_receive_from(implementation_type& impl, - const MutableBufferSequence& buffers, endpoint_type& sender_endpoint, - socket_base::message_flags flags, Handler handler) + class receive_from_op : public receive_from_op_base<MutableBufferSequence> { - if (!is_open(impl)) + public: + receive_from_op(socket_type socket, int protocol_type, + const MutableBufferSequence& buffers, endpoint_type& endpoint, + socket_base::message_flags flags, Handler handler) + : receive_from_op_base<MutableBufferSequence>(socket, protocol_type, + buffers, endpoint, flags, &receive_from_op::do_complete), + handler_(handler) { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); } - else + + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) { - // Make socket non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) + // Take ownership of the handler object. + receive_from_op* o(static_cast<receive_from_op*>(base)); + typedef handler_alloc_traits<Handler, receive_from_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); + + // Make the upcall if required. + if (owner) { - if (!(impl.flags_ & implementation_type::non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec, 0)); - return; - } - } - impl.flags_ |= implementation_type::internal_non_blocking; + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, o->ec_, o->bytes_transferred_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } - - reactor_.start_read_op(impl.socket_, impl.reactor_data_, - receive_from_operation<MutableBufferSequence, Handler>( - impl.socket_, impl.protocol_.type(), this->get_io_service(), - buffers, sender_endpoint, flags, handler)); } + + private: + Handler handler_; + }; + + // Start an asynchronous receive. The buffer for the data being received and + // the sender_endpoint object must both be valid for the lifetime of the + // asynchronous operation. + template <typename MutableBufferSequence, typename Handler> + void async_receive_from(implementation_type& impl, + const MutableBufferSequence& buffers, endpoint_type& sender_endpoint, + socket_base::message_flags flags, Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef receive_from_op<MutableBufferSequence, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + int protocol_type = impl.protocol_.type(); + handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, + protocol_type, buffers, sender_endpoint, flags, handler); + + start_op(impl, + (flags & socket_base::message_out_of_band) + ? reactor::except_op : reactor::read_op, + ptr.get(), true, false); + ptr.release(); } // Wait until data can be received without blocking. @@ -1445,28 +1294,20 @@ public: const null_buffers&, endpoint_type& sender_endpoint, socket_base::message_flags flags, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - // Reset endpoint since it can be given no sensible value at this time. - sender_endpoint = endpoint_type(); + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); - if (flags & socket_base::message_out_of_band) - { - reactor_.start_except_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler)); - } - else - { - reactor_.start_read_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } - } + // Reset endpoint since it can be given no sensible value at this time. + sender_endpoint = endpoint_type(); + + start_op(impl, + (flags & socket_base::message_out_of_band) + ? reactor::except_op : reactor::read_op, + ptr.get(), false, false); + ptr.release(); } // Accept a new connection. @@ -1546,19 +1387,15 @@ public: } } - template <typename Socket, typename Handler> - class accept_operation : - public handler_base_from_member<Handler> + template <typename Socket> + class accept_op_base : public reactor_op { public: - accept_operation(socket_type socket, boost::asio::io_service& io_service, - Socket& peer, const protocol_type& protocol, - endpoint_type* peer_endpoint, bool enable_connection_aborted, - Handler handler) - : handler_base_from_member<Handler>(handler), + accept_op_base(socket_type socket, Socket& peer, + const protocol_type& protocol, endpoint_type* peer_endpoint, + bool enable_connection_aborted, func_type complete_func) + : reactor_op(&accept_op_base::do_perform, complete_func), socket_(socket), - io_service_(io_service), - work_(io_service), peer_(peer), protocol_(protocol), peer_endpoint_(peer_endpoint), @@ -1566,27 +1403,25 @@ public: { } - bool perform(boost::system::error_code& ec, std::size_t&) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - return true; + accept_op_base* o(static_cast<accept_op_base*>(base)); for (;;) { // Accept the waiting connection. + boost::system::error_code ec; socket_holder new_socket; std::size_t addr_len = 0; - if (peer_endpoint_) - { - addr_len = peer_endpoint_->capacity(); - new_socket.reset(socket_ops::accept(socket_, - peer_endpoint_->data(), &addr_len, ec)); - } - else + std::size_t* addr_len_p = 0; + socket_addr_type* addr = 0; + if (o->peer_endpoint_) { - new_socket.reset(socket_ops::accept(socket_, 0, 0, ec)); + addr_len = o->peer_endpoint_->capacity(); + addr_len_p = &addr_len; + addr = o->peer_endpoint_->data(); } + new_socket.reset(socket_ops::accept(o->socket_, addr, addr_len_p, ec)); // Retry operation if interrupted by signal. if (ec == boost::asio::error::interrupted) @@ -1597,83 +1432,95 @@ public: || ec == boost::asio::error::try_again) return false; if (ec == boost::asio::error::connection_aborted - && !enable_connection_aborted_) + && !o->enable_connection_aborted_) return false; #if defined(EPROTO) - if (ec.value() == EPROTO && !enable_connection_aborted_) + if (ec.value() == EPROTO && !o->enable_connection_aborted_) return false; #endif // defined(EPROTO) // Transfer ownership of the new socket to the peer object. if (!ec) { - if (peer_endpoint_) - peer_endpoint_->resize(addr_len); - peer_.assign(protocol_, new_socket.get(), ec); + if (o->peer_endpoint_) + o->peer_endpoint_->resize(addr_len); + o->peer_.assign(o->protocol_, new_socket.get(), ec); if (!ec) new_socket.release(); } + o->ec_ = ec; return true; } } - void complete(const boost::system::error_code& ec, std::size_t) - { - io_service_.post(bind_handler(this->handler_, ec)); - } - private: socket_type socket_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; Socket& peer_; protocol_type protocol_; endpoint_type* peer_endpoint_; bool enable_connection_aborted_; }; - // Start an asynchronous accept. The peer and peer_endpoint objects - // must be valid until the accept's handler is invoked. template <typename Socket, typename Handler> - void async_accept(implementation_type& impl, Socket& peer, - endpoint_type* peer_endpoint, Handler handler) + class accept_op : public accept_op_base<Socket> { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor)); - } - else if (peer.is_open()) + public: + accept_op(socket_type socket, Socket& peer, const protocol_type& protocol, + endpoint_type* peer_endpoint, bool enable_connection_aborted, + Handler handler) + : accept_op_base<Socket>(socket, peer, protocol, peer_endpoint, + enable_connection_aborted, &accept_op::do_complete), + handler_(handler) { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::already_open)); } - else + + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) { - // Make socket non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) + // Take ownership of the handler object. + accept_op* o(static_cast<accept_op*>(base)); + typedef handler_alloc_traits<Handler, accept_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); + + // Make the upcall if required. + if (owner) { - if (!(impl.flags_ & implementation_type::non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec)); - return; - } - } - impl.flags_ |= implementation_type::internal_non_blocking; + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder1<Handler, boost::system::error_code> + handler(o->handler_, o->ec_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } - - reactor_.start_read_op(impl.socket_, impl.reactor_data_, - accept_operation<Socket, Handler>( - impl.socket_, this->get_io_service(), - peer, impl.protocol_, peer_endpoint, - (impl.flags_ & implementation_type::enable_connection_aborted) != 0, - handler)); } + + private: + Handler handler_; + }; + + // Start an asynchronous accept. The peer and peer_endpoint objects + // must be valid until the accept's handler is invoked. + template <typename Socket, typename Handler> + void async_accept(implementation_type& impl, Socket& peer, + endpoint_type* peer_endpoint, Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef accept_op<Socket, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + bool enable_connection_aborted = + (impl.flags_ & implementation_type::enable_connection_aborted) != 0; + handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, peer, + impl.protocol_, peer_endpoint, enable_connection_aborted, handler); + + start_accept_op(impl, ptr.get(), peer.is_open()); + ptr.release(); } // Connect the socket to the specified endpoint. @@ -1713,53 +1560,77 @@ public: return ec; } - template <typename Handler> - class connect_operation : - public handler_base_from_member<Handler> + class connect_op_base : public reactor_op { public: - connect_operation(socket_type socket, - boost::asio::io_service& io_service, Handler handler) - : handler_base_from_member<Handler>(handler), - socket_(socket), - io_service_(io_service), - work_(io_service) + connect_op_base(socket_type socket, func_type complete_func) + : reactor_op(&connect_op_base::do_perform, complete_func), + socket_(socket) { } - bool perform(boost::system::error_code& ec, std::size_t&) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - return true; + connect_op_base* o(static_cast<connect_op_base*>(base)); // Get the error code from the connect operation. int connect_error = 0; size_t connect_error_len = sizeof(connect_error); - if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR, - &connect_error, &connect_error_len, ec) == socket_error_retval) + if (socket_ops::getsockopt(o->socket_, SOL_SOCKET, SO_ERROR, + &connect_error, &connect_error_len, o->ec_) == socket_error_retval) return true; // The connection failed so the handler will be posted with an error code. if (connect_error) { - ec = boost::system::error_code(connect_error, + o->ec_ = boost::system::error_code(connect_error, boost::asio::error::get_system_category()); - return true; } return true; } - void complete(const boost::system::error_code& ec, std::size_t) + private: + socket_type socket_; + }; + + template <typename Handler> + class connect_op : public connect_op_base + { + public: + connect_op(socket_type socket, Handler handler) + : connect_op_base(socket, &connect_op::do_complete), + handler_(handler) { - io_service_.post(bind_handler(this->handler_, ec)); + } + + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) + { + // Take ownership of the handler object. + connect_op* o(static_cast<connect_op*>(base)); + typedef handler_alloc_traits<Handler, connect_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); + + // Make the upcall if required. + if (owner) + { + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder1<Handler, boost::system::error_code> + handler(o->handler_, o->ec_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } private: - socket_type socket_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; + Handler handler_; }; // Start an asynchronous connect. @@ -1767,59 +1638,103 @@ public: void async_connect(implementation_type& impl, const endpoint_type& peer_endpoint, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor)); - return; - } + // Allocate and construct an operation to wrap the handler. + typedef connect_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, handler); + + start_connect_op(impl, ptr.get(), peer_endpoint); + ptr.release(); + } - // Make socket non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) +private: + // Start the asynchronous read or write operation. + void start_op(implementation_type& impl, int op_type, + reactor_op* op, bool non_blocking, bool noop) + { + if (!noop) { - if (!(impl.flags_ & implementation_type::non_blocking)) + if (is_open(impl)) { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) + if (!non_blocking || is_non_blocking(impl) + || set_non_blocking(impl, op->ec_)) { - this->get_io_service().post(bind_handler(handler, ec)); + reactor_.start_op(op_type, impl.socket_, + impl.reactor_data_, op, non_blocking); return; } } - impl.flags_ |= implementation_type::internal_non_blocking; + else + op->ec_ = boost::asio::error::bad_descriptor; } - // Start the connect operation. The socket is already marked as non-blocking - // so the connection will take place asynchronously. - boost::system::error_code ec; - if (socket_ops::connect(impl.socket_, peer_endpoint.data(), - peer_endpoint.size(), ec) == 0) + io_service_impl_.post_immediate_completion(op); + } + + // Start the asynchronous accept operation. + void start_accept_op(implementation_type& impl, + reactor_op* op, bool peer_is_open) + { + if (!peer_is_open) + start_op(impl, reactor::read_op, op, true, false); + else { - // The connect operation has finished successfully so we need to post the - // handler immediately. - this->get_io_service().post(bind_handler(handler, - boost::system::error_code())); + op->ec_ = boost::asio::error::already_open; + io_service_impl_.post_immediate_completion(op); } - else if (ec == boost::asio::error::in_progress - || ec == boost::asio::error::would_block) + } + + // Start the asynchronous connect operation. + void start_connect_op(implementation_type& impl, + reactor_op* op, const endpoint_type& peer_endpoint) + { + if (is_open(impl)) { - // The connection is happening in the background, and we need to wait - // until the socket becomes writeable. - reactor_.start_connect_op(impl.socket_, impl.reactor_data_, - connect_operation<Handler>(impl.socket_, - this->get_io_service(), handler)); + if (is_non_blocking(impl) || set_non_blocking(impl, op->ec_)) + { + if (socket_ops::connect(impl.socket_, peer_endpoint.data(), + peer_endpoint.size(), op->ec_) != 0) + { + if (op->ec_ == boost::asio::error::in_progress + || op->ec_ == boost::asio::error::would_block) + { + op->ec_ = boost::system::error_code(); + reactor_.start_op(reactor::connect_op, + impl.socket_, impl.reactor_data_, op, false); + return; + } + } + } } else - { - // The connect operation has failed, so post the handler immediately. - this->get_io_service().post(bind_handler(handler, ec)); - } + op->ec_ = boost::asio::error::bad_descriptor; + + io_service_impl_.post_immediate_completion(op); } -private: + // Determine whether the socket has been set non-blocking. + bool is_non_blocking(implementation_type& impl) const + { + return (impl.flags_ & implementation_type::non_blocking); + } + + // Set the internal non-blocking flag. + bool set_non_blocking(implementation_type& impl, + boost::system::error_code& ec) + { + ioctl_arg_type non_blocking = 1; + if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) + return false; + impl.flags_ |= implementation_type::internal_non_blocking; + return true; + } + + // The io_service implementation used to post completions. + io_service_impl& io_service_impl_; + // The selector that performs event demultiplexing for the service. - Reactor& reactor_; + reactor& reactor_; }; } // namespace detail |