diff options
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/win_iocp_socket_service.hpp')
-rw-r--r-- | 3rdParty/Boost/src/boost/asio/detail/win_iocp_socket_service.hpp | 1693 |
1 files changed, 652 insertions, 1041 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/win_iocp_socket_service.hpp b/3rdParty/Boost/src/boost/asio/detail/win_iocp_socket_service.hpp index e0c0806..5d545a8 100644 --- a/3rdParty/Boost/src/boost/asio/detail/win_iocp_socket_service.hpp +++ b/3rdParty/Boost/src/boost/asio/detail/win_iocp_socket_service.hpp @@ -28,15 +28,19 @@ #include <boost/weak_ptr.hpp> #include <boost/asio/detail/pop_options.hpp> -#include <boost/asio/buffer.hpp> #include <boost/asio/error.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/socket_base.hpp> #include <boost/asio/detail/bind_handler.hpp> +#include <boost/asio/detail/buffer_sequence_adapter.hpp> +#include <boost/asio/detail/fenced_block.hpp> #include <boost/asio/detail/handler_alloc_helpers.hpp> #include <boost/asio/detail/handler_invoke_helpers.hpp> #include <boost/asio/detail/mutex.hpp> -#include <boost/asio/detail/select_reactor.hpp> +#include <boost/asio/detail/null_buffers_op.hpp> +#include <boost/asio/detail/operation.hpp> +#include <boost/asio/detail/reactor.hpp> +#include <boost/asio/detail/reactor_op.hpp> #include <boost/asio/detail/socket_holder.hpp> #include <boost/asio/detail/socket_ops.hpp> #include <boost/asio/detail/socket_types.hpp> @@ -48,7 +52,6 @@ namespace detail { template <typename Protocol> class win_iocp_socket_service - : public boost::asio::detail::service_base<win_iocp_socket_service<Protocol> > { public: // The protocol type. @@ -57,9 +60,6 @@ public: // The endpoint type. typedef typename Protocol::endpoint endpoint_type; - // Base class for all operations. - typedef win_iocp_io_service::operation operation; - struct noop_deleter { void operator()(void*) {} }; typedef boost::shared_ptr<void> shared_cancel_token_type; typedef boost::weak_ptr<void> weak_cancel_token_type; @@ -114,9 +114,6 @@ public: endpoint_type remote_endpoint_; }; - // The type of the reactor used for connect operations. - typedef detail::select_reactor<true> reactor_type; - // The implementation type of the socket. class implementation_type { @@ -161,7 +158,7 @@ public: protocol_type protocol_; // Per-descriptor data used by the reactor. - reactor_type::per_descriptor_data reactor_data_; + reactor::per_descriptor_data reactor_data_; #if defined(BOOST_ASIO_ENABLE_CANCELIO) // The ID of the thread from which it is safe to cancel asynchronous @@ -176,14 +173,10 @@ public: implementation_type* prev_; }; - // The maximum number of buffers to support in a single operation. - enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len }; - // Constructor. win_iocp_socket_service(boost::asio::io_service& io_service) - : boost::asio::detail::service_base< - win_iocp_socket_service<Protocol> >(io_service), - iocp_service_(boost::asio::use_service<win_iocp_io_service>(io_service)), + : io_service_(io_service), + iocp_service_(use_service<win_iocp_io_service>(io_service)), reactor_(0), mutex_(), impl_list_(0) @@ -304,11 +297,11 @@ public: // Check if the reactor was created, in which case we need to close the // socket on the reactor as well to cancel any operations that might be // running there. - reactor_type* reactor = static_cast<reactor_type*>( + reactor* r = static_cast<reactor*>( interlocked_compare_exchange_pointer( reinterpret_cast<void**>(&reactor_), 0, 0)); - if (reactor) - reactor->close_descriptor(impl.socket_, impl.reactor_data_); + if (r) + r->close_descriptor(impl.socket_, impl.reactor_data_); if (socket_ops::close(impl.socket_, ec) == socket_error_retval) return ec; @@ -665,23 +658,11 @@ public: return 0; } - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = const_cast<char*>( - boost::asio::buffer_cast<const char*>(buffer)); - total_buffer_size += boost::asio::buffer_size(buffer); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); // A request to receive 0 bytes on a stream socket is a no-op. - if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) + if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty()) { ec = boost::system::error_code(); return 0; @@ -689,8 +670,8 @@ public: // Send the data. DWORD bytes_transferred = 0; - int result = ::WSASend(impl.socket_, bufs, - i, &bytes_transferred, flags, 0, 0); + int result = ::WSASend(impl.socket_, bufs.buffers(), + bufs.count(), &bytes_transferred, flags, 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); @@ -724,94 +705,63 @@ public: } template <typename ConstBufferSequence, typename Handler> - class send_operation - : public operation + class send_op : public operation { public: - send_operation(win_iocp_io_service& io_service, - weak_cancel_token_type cancel_token, + send_op(weak_cancel_token_type cancel_token, const ConstBufferSequence& buffers, Handler handler) - : operation(io_service, - &send_operation<ConstBufferSequence, Handler>::do_completion_impl, - &send_operation<ConstBufferSequence, Handler>::destroy_impl), - work_(io_service.get_io_service()), + : operation(&send_op::do_complete), cancel_token_(cancel_token), buffers_(buffers), handler_(handler) { } - private: - static void do_completion_impl(operation* op, - DWORD last_error, size_t bytes_transferred) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code ec, std::size_t bytes_transferred) { // Take ownership of the operation object. - typedef send_operation<ConstBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); + send_op* o(static_cast<send_op*>(base)); + typedef handler_alloc_traits<Handler, send_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); -#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Check whether buffers are still valid. - typename ConstBufferSequence::const_iterator iter - = handler_op->buffers_.begin(); - typename ConstBufferSequence::const_iterator end - = handler_op->buffers_.end(); - while (iter != end) + // Make the upcall if required. + if (owner) { - boost::asio::const_buffer buffer(*iter); - boost::asio::buffer_cast<const char*>(buffer); - ++iter; - } +#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) + // Check whether buffers are still valid. + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence>::validate(o->buffers_); #endif // defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Map non-portable errors to their portable counterparts. - boost::system::error_code ec(last_error, - boost::asio::error::get_system_category()); - if (ec.value() == ERROR_NETNAME_DELETED) - { - if (handler_op->cancel_token_.expired()) - ec = boost::asio::error::operation_aborted; - else - ec = boost::asio::error::connection_reset; - } - else if (ec.value() == ERROR_PORT_UNREACHABLE) - { - ec = boost::asio::error::connection_refused; - } - - // Make a copy of the handler so that the memory can be deallocated before - // the upcall is made. - Handler handler(handler_op->handler_); - - // Free the memory associated with the handler. - ptr.reset(); - - // Call the handler. - boost_asio_handler_invoke_helpers::invoke( - detail::bind_handler(handler, ec, bytes_transferred), handler); - } - - static void destroy_impl(operation* op) - { - // Take ownership of the operation object. - typedef send_operation<ConstBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); - - // A sub-object of the handler may be the true owner of the memory - // associated with the handler. Consequently, a local copy of the handler - // is required to ensure that any owning sub-object remains valid until - // after we have deallocated the memory here. - Handler handler(handler_op->handler_); - (void)handler; + // Map non-portable errors to their portable counterparts. + if (ec.value() == ERROR_NETNAME_DELETED) + { + if (o->cancel_token_.expired()) + ec = boost::asio::error::operation_aborted; + else + ec = boost::asio::error::connection_reset; + } + else if (ec.value() == ERROR_PORT_UNREACHABLE) + { + ec = boost::asio::error::connection_refused; + } - // Free the memory associated with the handler. - ptr.reset(); + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, ec, bytes_transferred); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } - boost::asio::io_service::work work_; + private: weak_cancel_token_type cancel_token_; ConstBufferSequence buffers_; Handler handler_; @@ -823,127 +773,34 @@ public: void async_send(implementation_type& impl, const ConstBufferSequence& buffers, socket_base::message_flags flags, Handler handler) { -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) - // Allocate and construct an operation to wrap the handler. - typedef send_operation<ConstBufferSequence, Handler> value_type; + typedef send_op<ConstBufferSequence, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); - handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, + handler_ptr<alloc_traits> ptr(raw_ptr, impl.cancel_token_, buffers, handler); - if (!is_open(impl)) - { - ptr.get()->on_immediate_completion(WSAEBADF, 0); - ptr.release(); - return; - } - - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = const_cast<char*>( - boost::asio::buffer_cast<const char*>(buffer)); - total_buffer_size += boost::asio::buffer_size(buffer); - } - - // A request to receive 0 bytes on a stream socket is a no-op. - if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) - { - ptr.get()->on_immediate_completion(0, 0); - ptr.release(); - return; - } - - // Send the data. - DWORD bytes_transferred = 0; - int result = ::WSASend(impl.socket_, bufs, i, - &bytes_transferred, flags, ptr.get(), 0); - DWORD last_error = ::WSAGetLastError(); + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); - // Check if the operation completed immediately. - if (result != 0 && last_error != WSA_IO_PENDING) - { - ptr.get()->on_immediate_completion(last_error, bytes_transferred); - ptr.release(); - } - else - { - ptr.get()->on_pending(); - ptr.release(); - } + start_send_op(impl, bufs.buffers(), bufs.count(), flags, + impl.protocol_.type() == SOCK_STREAM && bufs.all_empty(), ptr.get()); + ptr.release(); } - template <typename Handler> - class null_buffers_operation - { - public: - null_buffers_operation(boost::asio::io_service& io_service, Handler handler) - : work_(io_service), - handler_(handler) - { - } - - bool perform(boost::system::error_code&, - std::size_t& bytes_transferred) - { - bytes_transferred = 0; - return true; - } - - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - work_.get_io_service().post(bind_handler( - handler_, ec, bytes_transferred)); - } - - private: - boost::asio::io_service::work work_; - Handler handler_; - }; - // Start an asynchronous wait until data can be sent without blocking. template <typename Handler> void async_send(implementation_type& impl, const null_buffers&, socket_base::message_flags, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - // Check if the reactor was already obtained from the io_service. - reactor_type* reactor = static_cast<reactor_type*>( - interlocked_compare_exchange_pointer( - reinterpret_cast<void**>(&reactor_), 0, 0)); - if (!reactor) - { - reactor = &(boost::asio::use_service<reactor_type>( - this->get_io_service())); - interlocked_exchange_pointer( - reinterpret_cast<void**>(&reactor_), reactor); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); - reactor->start_write_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + start_reactor_op(impl, reactor::write_op, ptr.get()); + ptr.release(); } // Send a datagram to the specified endpoint. Returns the number of bytes @@ -959,23 +816,14 @@ public: return 0; } - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = const_cast<char*>( - boost::asio::buffer_cast<const char*>(buffer)); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); // Send the data. DWORD bytes_transferred = 0; - int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, - flags, destination.data(), static_cast<int>(destination.size()), 0, 0); + int result = ::WSASendTo(impl.socket_, bufs.buffers(), bufs.count(), + &bytes_transferred, flags, destination.data(), + static_cast<int>(destination.size()), 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); @@ -1008,85 +856,57 @@ public: } template <typename ConstBufferSequence, typename Handler> - class send_to_operation - : public operation + class send_to_op : public operation { public: - send_to_operation(win_iocp_io_service& io_service, + send_to_op(weak_cancel_token_type cancel_token, const ConstBufferSequence& buffers, Handler handler) - : operation(io_service, - &send_to_operation<ConstBufferSequence, Handler>::do_completion_impl, - &send_to_operation<ConstBufferSequence, Handler>::destroy_impl), - work_(io_service.get_io_service()), + : operation(&send_to_op::do_complete), + cancel_token_(cancel_token), buffers_(buffers), handler_(handler) { } - private: - static void do_completion_impl(operation* op, - DWORD last_error, size_t bytes_transferred) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code ec, std::size_t bytes_transferred) { // Take ownership of the operation object. - typedef send_to_operation<ConstBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); + send_to_op* o(static_cast<send_to_op*>(base)); + typedef handler_alloc_traits<Handler, send_to_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); -#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Check whether buffers are still valid. - typename ConstBufferSequence::const_iterator iter - = handler_op->buffers_.begin(); - typename ConstBufferSequence::const_iterator end - = handler_op->buffers_.end(); - while (iter != end) + // Make the upcall if required. + if (owner) { - boost::asio::const_buffer buffer(*iter); - boost::asio::buffer_cast<const char*>(buffer); - ++iter; - } +#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) + // Check whether buffers are still valid. + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence>::validate(o->buffers_); #endif // defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Map non-portable errors to their portable counterparts. - boost::system::error_code ec(last_error, - boost::asio::error::get_system_category()); - if (ec.value() == ERROR_PORT_UNREACHABLE) - { - ec = boost::asio::error::connection_refused; - } - - // Make a copy of the handler so that the memory can be deallocated before - // the upcall is made. - Handler handler(handler_op->handler_); - - // Free the memory associated with the handler. - ptr.reset(); - - // Call the handler. - boost_asio_handler_invoke_helpers::invoke( - detail::bind_handler(handler, ec, bytes_transferred), handler); - } - - static void destroy_impl(operation* op) - { - // Take ownership of the operation object. - typedef send_to_operation<ConstBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); - - // A sub-object of the handler may be the true owner of the memory - // associated with the handler. Consequently, a local copy of the handler - // is required to ensure that any owning sub-object remains valid until - // after we have deallocated the memory here. - Handler handler(handler_op->handler_); - (void)handler; + // Map non-portable errors to their portable counterparts. + if (ec.value() == ERROR_PORT_UNREACHABLE) + { + ec = boost::asio::error::connection_refused; + } - // Free the memory associated with the handler. - ptr.reset(); + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, ec, bytes_transferred); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } - boost::asio::io_service::work work_; + private: + weak_cancel_token_type cancel_token_; ConstBufferSequence buffers_; Handler handler_; }; @@ -1098,57 +918,19 @@ public: const ConstBufferSequence& buffers, const endpoint_type& destination, socket_base::message_flags flags, Handler handler) { -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) - // Allocate and construct an operation to wrap the handler. - typedef send_to_operation<ConstBufferSequence, Handler> value_type; + typedef send_to_op<ConstBufferSequence, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); - handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler); - - if (!is_open(impl)) - { - ptr.get()->on_immediate_completion(WSAEBADF, 0); - ptr.release(); - return; - } - - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = const_cast<char*>( - boost::asio::buffer_cast<const char*>(buffer)); - } + handler_ptr<alloc_traits> ptr(raw_ptr, + impl.cancel_token_, buffers, handler); - // Send the data. - DWORD bytes_transferred = 0; - int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags, - destination.data(), static_cast<int>(destination.size()), ptr.get(), 0); - DWORD last_error = ::WSAGetLastError(); + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); - // Check if the operation completed immediately. - if (result != 0 && last_error != WSA_IO_PENDING) - { - ptr.get()->on_immediate_completion(last_error, bytes_transferred); - ptr.release(); - } - else - { - ptr.get()->on_pending(); - ptr.release(); - } + start_send_to_op(impl, bufs.buffers(), + bufs.count(), destination, flags, ptr.get()); + ptr.release(); } // Start an asynchronous wait until data can be sent without blocking. @@ -1156,29 +938,14 @@ public: void async_send_to(implementation_type& impl, const null_buffers&, socket_base::message_flags, const endpoint_type&, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - // Check if the reactor was already obtained from the io_service. - reactor_type* reactor = static_cast<reactor_type*>( - interlocked_compare_exchange_pointer( - reinterpret_cast<void**>(&reactor_), 0, 0)); - if (!reactor) - { - reactor = &(boost::asio::use_service<reactor_type>( - this->get_io_service())); - interlocked_exchange_pointer( - reinterpret_cast<void**>(&reactor_), reactor); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); - reactor->start_write_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + start_reactor_op(impl, reactor::write_op, ptr.get()); + ptr.release(); } // Receive some data from the peer. Returns the number of bytes received. @@ -1193,22 +960,11 @@ public: return 0; } - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = boost::asio::buffer_cast<char*>(buffer); - total_buffer_size += boost::asio::buffer_size(buffer); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); // A request to receive 0 bytes on a stream socket is a no-op. - if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) + if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty()) { ec = boost::system::error_code(); return 0; @@ -1217,8 +973,8 @@ public: // Receive some data. DWORD bytes_transferred = 0; DWORD recv_flags = flags; - int result = ::WSARecv(impl.socket_, bufs, i, - &bytes_transferred, &recv_flags, 0, 0); + int result = ::WSARecv(impl.socket_, bufs.buffers(), + bufs.count(), &bytes_transferred, &recv_flags, 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); @@ -1257,106 +1013,75 @@ public: } template <typename MutableBufferSequence, typename Handler> - class receive_operation - : public operation + class receive_op : public operation { public: - receive_operation(int protocol_type, win_iocp_io_service& io_service, - weak_cancel_token_type cancel_token, + receive_op(int protocol_type, weak_cancel_token_type cancel_token, const MutableBufferSequence& buffers, Handler handler) - : operation(io_service, - &receive_operation< - MutableBufferSequence, Handler>::do_completion_impl, - &receive_operation< - MutableBufferSequence, Handler>::destroy_impl), + : operation(&receive_op::do_complete), protocol_type_(protocol_type), - work_(io_service.get_io_service()), cancel_token_(cancel_token), buffers_(buffers), handler_(handler) { } - private: - static void do_completion_impl(operation* op, - DWORD last_error, size_t bytes_transferred) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code ec, std::size_t bytes_transferred) { // Take ownership of the operation object. - typedef receive_operation<MutableBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); + receive_op* o(static_cast<receive_op*>(base)); + typedef handler_alloc_traits<Handler, receive_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); -#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Check whether buffers are still valid. - typename MutableBufferSequence::const_iterator iter - = handler_op->buffers_.begin(); - typename MutableBufferSequence::const_iterator end - = handler_op->buffers_.end(); - while (iter != end) + // Make the upcall if required. + if (owner) { - boost::asio::mutable_buffer buffer(*iter); - boost::asio::buffer_cast<char*>(buffer); - ++iter; - } +#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) + // Check whether buffers are still valid. + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence>::validate(o->buffers_); #endif // defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Map non-portable errors to their portable counterparts. - boost::system::error_code ec(last_error, - boost::asio::error::get_system_category()); - if (ec.value() == ERROR_NETNAME_DELETED) - { - if (handler_op->cancel_token_.expired()) - ec = boost::asio::error::operation_aborted; - else - ec = boost::asio::error::connection_reset; - } - else if (ec.value() == ERROR_PORT_UNREACHABLE) - { - ec = boost::asio::error::connection_refused; - } - - // Check for connection closed. - else if (!ec && bytes_transferred == 0 - && handler_op->protocol_type_ == SOCK_STREAM - && !boost::is_same<MutableBufferSequence, null_buffers>::value) - { - ec = boost::asio::error::eof; - } - - // Make a copy of the handler so that the memory can be deallocated before - // the upcall is made. - Handler handler(handler_op->handler_); - - // Free the memory associated with the handler. - ptr.reset(); - - // Call the handler. - boost_asio_handler_invoke_helpers::invoke( - detail::bind_handler(handler, ec, bytes_transferred), handler); - } - - static void destroy_impl(operation* op) - { - // Take ownership of the operation object. - typedef receive_operation<MutableBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); + // Map non-portable errors to their portable counterparts. + if (ec.value() == ERROR_NETNAME_DELETED) + { + if (o->cancel_token_.expired()) + ec = boost::asio::error::operation_aborted; + else + ec = boost::asio::error::connection_reset; + } + else if (ec.value() == ERROR_PORT_UNREACHABLE) + { + ec = boost::asio::error::connection_refused; + } - // A sub-object of the handler may be the true owner of the memory - // associated with the handler. Consequently, a local copy of the handler - // is required to ensure that any owning sub-object remains valid until - // after we have deallocated the memory here. - Handler handler(handler_op->handler_); - (void)handler; + // Check for connection closed. + else if (!ec && bytes_transferred == 0 + && o->protocol_type_ == SOCK_STREAM + && !buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence>::all_empty(o->buffers_) + && !boost::is_same<MutableBufferSequence, null_buffers>::value) + { + ec = boost::asio::error::eof; + } - // Free the memory associated with the handler. - ptr.reset(); + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, ec, bytes_transferred); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } + private: int protocol_type_; - boost::asio::io_service::work work_; weak_cancel_token_type cancel_token_; MutableBufferSequence buffers_; Handler handler_; @@ -1369,67 +1094,20 @@ public: const MutableBufferSequence& buffers, socket_base::message_flags flags, Handler handler) { -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) - // Allocate and construct an operation to wrap the handler. - typedef receive_operation<MutableBufferSequence, Handler> value_type; + typedef receive_op<MutableBufferSequence, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); int protocol_type = impl.protocol_.type(); handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type, - iocp_service_, impl.cancel_token_, buffers, handler); - - if (!is_open(impl)) - { - ptr.get()->on_immediate_completion(WSAEBADF, 0); - ptr.release(); - return; - } - - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = boost::asio::buffer_cast<char*>(buffer); - total_buffer_size += boost::asio::buffer_size(buffer); - } + impl.cancel_token_, buffers, handler); - // A request to receive 0 bytes on a stream socket is a no-op. - if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0) - { - ptr.get()->on_immediate_completion(0, 0); - ptr.release(); - return; - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); - // Receive some data. - DWORD bytes_transferred = 0; - DWORD recv_flags = flags; - int result = ::WSARecv(impl.socket_, bufs, i, - &bytes_transferred, &recv_flags, ptr.get(), 0); - DWORD last_error = ::WSAGetLastError(); - if (result != 0 && last_error != WSA_IO_PENDING) - { - ptr.get()->on_immediate_completion(last_error, bytes_transferred); - ptr.release(); - } - else - { - ptr.get()->on_pending(); - ptr.release(); - } + start_receive_op(impl, bufs.buffers(), bufs.count(), flags, + protocol_type == SOCK_STREAM && bufs.all_empty(), ptr.get()); + ptr.release(); } // Wait until data can be received without blocking. @@ -1437,75 +1115,36 @@ public: void async_receive(implementation_type& impl, const null_buffers& buffers, socket_base::message_flags flags, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else if (impl.protocol_.type() == SOCK_STREAM) + if (impl.protocol_.type() == SOCK_STREAM) { // For stream sockets on Windows, we may issue a 0-byte overlapped // WSARecv to wait until there is data available on the socket. -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) - // Allocate and construct an operation to wrap the handler. - typedef receive_operation<null_buffers, Handler> value_type; + typedef receive_op<null_buffers, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); int protocol_type = impl.protocol_.type(); handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type, - iocp_service_, impl.cancel_token_, buffers, handler); + impl.cancel_token_, buffers, handler); - // Issue a receive operation with an empty buffer. ::WSABUF buf = { 0, 0 }; - DWORD bytes_transferred = 0; - DWORD recv_flags = flags; - int result = ::WSARecv(impl.socket_, &buf, 1, - &bytes_transferred, &recv_flags, ptr.get(), 0); - DWORD last_error = ::WSAGetLastError(); - if (result != 0 && last_error != WSA_IO_PENDING) - { - ptr.get()->on_immediate_completion(last_error, bytes_transferred); - ptr.release(); - } - else - { - ptr.get()->on_pending(); - ptr.release(); - } + start_receive_op(impl, &buf, 1, flags, false, ptr.get()); + ptr.release(); } else { - // Check if the reactor was already obtained from the io_service. - reactor_type* reactor = static_cast<reactor_type*>( - interlocked_compare_exchange_pointer( - reinterpret_cast<void**>(&reactor_), 0, 0)); - if (!reactor) - { - reactor = &(boost::asio::use_service<reactor_type>( - this->get_io_service())); - interlocked_exchange_pointer( - reinterpret_cast<void**>(&reactor_), reactor); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); - if (flags & socket_base::message_out_of_band) - { - reactor->start_except_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler)); - } - else - { - reactor->start_read_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + start_reactor_op(impl, + (flags & socket_base::message_out_of_band) + ? reactor::except_op : reactor::read_op, + ptr.get()); + ptr.release(); } } @@ -1523,24 +1162,16 @@ public: return 0; } - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = boost::asio::buffer_cast<char*>(buffer); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); // Receive some data. DWORD bytes_transferred = 0; DWORD recv_flags = flags; int endpoint_size = static_cast<int>(sender_endpoint.capacity()); - int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred, - &recv_flags, sender_endpoint.data(), &endpoint_size, 0, 0); + int result = ::WSARecvFrom(impl.socket_, bufs.buffers(), + bufs.count(), &bytes_transferred, &recv_flags, + sender_endpoint.data(), &endpoint_size, 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); @@ -1583,22 +1214,15 @@ public: } template <typename MutableBufferSequence, typename Handler> - class receive_from_operation - : public operation + class receive_from_op : public operation { public: - receive_from_operation(int protocol_type, win_iocp_io_service& io_service, - endpoint_type& endpoint, const MutableBufferSequence& buffers, - Handler handler) - : operation(io_service, - &receive_from_operation< - MutableBufferSequence, Handler>::do_completion_impl, - &receive_from_operation< - MutableBufferSequence, Handler>::destroy_impl), + receive_from_op(int protocol_type, endpoint_type& endpoint, + const MutableBufferSequence& buffers, Handler handler) + : operation(&receive_from_op::do_complete), protocol_type_(protocol_type), endpoint_(endpoint), endpoint_size_(static_cast<int>(endpoint.capacity())), - work_(io_service.get_io_service()), buffers_(buffers), handler_(handler) { @@ -1609,83 +1233,51 @@ public: return endpoint_size_; } - private: - static void do_completion_impl(operation* op, - DWORD last_error, size_t bytes_transferred) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code ec, std::size_t bytes_transferred) { // Take ownership of the operation object. - typedef receive_from_operation<MutableBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); + receive_from_op* o(static_cast<receive_from_op*>(base)); + typedef handler_alloc_traits<Handler, receive_from_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); -#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Check whether buffers are still valid. - typename MutableBufferSequence::const_iterator iter - = handler_op->buffers_.begin(); - typename MutableBufferSequence::const_iterator end - = handler_op->buffers_.end(); - while (iter != end) + // Make the upcall if required. + if (owner) { - boost::asio::mutable_buffer buffer(*iter); - boost::asio::buffer_cast<char*>(buffer); - ++iter; - } +#if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) + // Check whether buffers are still valid. + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence>::validate(o->buffers_); #endif // defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING) - // Map non-portable errors to their portable counterparts. - boost::system::error_code ec(last_error, - boost::asio::error::get_system_category()); - if (ec.value() == ERROR_PORT_UNREACHABLE) - { - ec = boost::asio::error::connection_refused; - } + // Map non-portable errors to their portable counterparts. + if (ec.value() == ERROR_PORT_UNREACHABLE) + { + ec = boost::asio::error::connection_refused; + } - // Check for connection closed. - if (!ec && bytes_transferred == 0 - && handler_op->protocol_type_ == SOCK_STREAM) - { - ec = boost::asio::error::eof; + // Record the size of the endpoint returned by the operation. + o->endpoint_.resize(o->endpoint_size_); + + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, ec, bytes_transferred); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } - - // Record the size of the endpoint returned by the operation. - handler_op->endpoint_.resize(handler_op->endpoint_size_); - - // Make a copy of the handler so that the memory can be deallocated before - // the upcall is made. - Handler handler(handler_op->handler_); - - // Free the memory associated with the handler. - ptr.reset(); - - // Call the handler. - boost_asio_handler_invoke_helpers::invoke( - detail::bind_handler(handler, ec, bytes_transferred), handler); - } - - static void destroy_impl(operation* op) - { - // Take ownership of the operation object. - typedef receive_from_operation<MutableBufferSequence, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); - - // A sub-object of the handler may be the true owner of the memory - // associated with the handler. Consequently, a local copy of the handler - // is required to ensure that any owning sub-object remains valid until - // after we have deallocated the memory here. - Handler handler(handler_op->handler_); - (void)handler; - - // Free the memory associated with the handler. - ptr.reset(); } + private: int protocol_type_; endpoint_type& endpoint_; int endpoint_size_; - boost::asio::io_service::work work_; + weak_cancel_token_type cancel_token_; MutableBufferSequence buffers_; Handler handler_; }; @@ -1698,58 +1290,20 @@ public: const MutableBufferSequence& buffers, endpoint_type& sender_endp, socket_base::message_flags flags, Handler handler) { -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) - // Allocate and construct an operation to wrap the handler. - typedef receive_from_operation<MutableBufferSequence, Handler> value_type; + typedef receive_from_op<MutableBufferSequence, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); int protocol_type = impl.protocol_.type(); - handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type, - iocp_service_, sender_endp, buffers, handler); - - if (!is_open(impl)) - { - ptr.get()->on_immediate_completion(WSAEBADF, 0); - ptr.release(); - return; - } + handler_ptr<alloc_traits> ptr(raw_ptr, + protocol_type, sender_endp, buffers, handler); - // Copy buffers into WSABUF array. - ::WSABUF bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - DWORD i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - bufs[i].len = static_cast<u_long>(boost::asio::buffer_size(buffer)); - bufs[i].buf = boost::asio::buffer_cast<char*>(buffer); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); - // Receive some data. - DWORD bytes_transferred = 0; - DWORD recv_flags = flags; - int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred, - &recv_flags, sender_endp.data(), &ptr.get()->endpoint_size(), - ptr.get(), 0); - DWORD last_error = ::WSAGetLastError(); - if (result != 0 && last_error != WSA_IO_PENDING) - { - ptr.get()->on_immediate_completion(last_error, bytes_transferred); - ptr.release(); - } - else - { - ptr.get()->on_pending(); - ptr.release(); - } + start_receive_from_op(impl, bufs.buffers(), bufs.count(), + sender_endp, flags, &ptr.get()->endpoint_size(), ptr.get()); + ptr.release(); } // Wait until data can be received without blocking. @@ -1758,40 +1312,20 @@ public: const null_buffers&, endpoint_type& sender_endpoint, socket_base::message_flags flags, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - // Check if the reactor was already obtained from the io_service. - reactor_type* reactor = static_cast<reactor_type*>( - interlocked_compare_exchange_pointer( - reinterpret_cast<void**>(&reactor_), 0, 0)); - if (!reactor) - { - reactor = &(boost::asio::use_service<reactor_type>( - this->get_io_service())); - interlocked_exchange_pointer( - reinterpret_cast<void**>(&reactor_), reactor); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); - // Reset endpoint since it can be given no sensible value at this time. - sender_endpoint = endpoint_type(); + // Reset endpoint since it can be given no sensible value at this time. + sender_endpoint = endpoint_type(); - if (flags & socket_base::message_out_of_band) - { - reactor->start_except_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler)); - } - else - { - reactor->start_read_op(impl.socket_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } - } + start_reactor_op(impl, + (flags & socket_base::message_out_of_band) + ? reactor::except_op : reactor::read_op, + ptr.get()); + ptr.release(); } // Accept a new connection. @@ -1852,32 +1386,27 @@ public: } template <typename Socket, typename Handler> - class accept_operation - : public operation + class accept_op : public operation { public: - accept_operation(win_iocp_io_service& io_service, - socket_type socket, socket_type new_socket, Socket& peer, - const protocol_type& protocol, endpoint_type* peer_endpoint, - bool enable_connection_aborted, Handler handler) - : operation(io_service, - &accept_operation<Socket, Handler>::do_completion_impl, - &accept_operation<Socket, Handler>::destroy_impl), - io_service_(io_service), + accept_op(win_iocp_io_service& iocp_service, socket_type socket, + Socket& peer, const protocol_type& protocol, + endpoint_type* peer_endpoint, bool enable_connection_aborted, + Handler handler) + : operation(&accept_op::do_complete), + iocp_service_(iocp_service), socket_(socket), - new_socket_(new_socket), peer_(peer), protocol_(protocol), peer_endpoint_(peer_endpoint), - work_(io_service.get_io_service()), enable_connection_aborted_(enable_connection_aborted), handler_(handler) { } - socket_type new_socket() + socket_holder& new_socket() { - return new_socket_.get(); + return new_socket_; } void* output_buffer() @@ -1890,166 +1419,145 @@ public: return sizeof(sockaddr_storage_type) + 16; } - private: - static void do_completion_impl(operation* op, DWORD last_error, size_t) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code ec, std::size_t /*bytes_transferred*/) { - // Take ownership of the operation object. - typedef accept_operation<Socket, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); + // Take ownership of the handler object. + accept_op* o(static_cast<accept_op*>(base)); + typedef handler_alloc_traits<Handler, accept_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); - // Map Windows error ERROR_NETNAME_DELETED to connection_aborted. - if (last_error == ERROR_NETNAME_DELETED) + // Make the upcall if required. + if (owner) { - last_error = WSAECONNABORTED; - } + // Map Windows error ERROR_NETNAME_DELETED to connection_aborted. + if (ec.value() == ERROR_NETNAME_DELETED) + { + ec = boost::asio::error::connection_aborted; + } - // Restart the accept operation if we got the connection_aborted error - // and the enable_connection_aborted socket option is not set. - if (last_error == WSAECONNABORTED - && !ptr.get()->enable_connection_aborted_) - { - // Reset OVERLAPPED structure. - ptr.get()->reset(); - - // Create a new socket for the next connection, since the AcceptEx call - // fails with WSAEINVAL if we try to reuse the same socket. - boost::system::error_code ec; - ptr.get()->new_socket_.reset(); - ptr.get()->new_socket_.reset(socket_ops::socket( - ptr.get()->protocol_.family(), ptr.get()->protocol_.type(), - ptr.get()->protocol_.protocol(), ec)); - if (ptr.get()->new_socket() != invalid_socket) + // Restart the accept operation if we got the connection_aborted error + // and the enable_connection_aborted socket option is not set. + if (ec == boost::asio::error::connection_aborted + && !o->enable_connection_aborted_) { - // Accept a connection. - DWORD bytes_read = 0; - BOOL result = ::AcceptEx(ptr.get()->socket_, ptr.get()->new_socket(), - ptr.get()->output_buffer(), 0, ptr.get()->address_length(), - ptr.get()->address_length(), &bytes_read, ptr.get()); - last_error = ::WSAGetLastError(); - - // Check if the operation completed immediately. - if (!result && last_error != WSA_IO_PENDING) + // Reset OVERLAPPED structure. + o->reset(); + + // Create a new socket for the next connection, since the AcceptEx + // call fails with WSAEINVAL if we try to reuse the same socket. + o->new_socket_.reset(); + o->new_socket_.reset(socket_ops::socket(o->protocol_.family(), + o->protocol_.type(), o->protocol_.protocol(), ec)); + if (o->new_socket_.get() != invalid_socket) { - if (last_error == ERROR_NETNAME_DELETED - || last_error == WSAECONNABORTED) + // Accept a connection. + DWORD bytes_read = 0; + BOOL result = ::AcceptEx(o->socket_, o->new_socket_.get(), + o->output_buffer(), 0, o->address_length(), + o->address_length(), &bytes_read, o); + DWORD last_error = ::WSAGetLastError(); + ec = boost::system::error_code(last_error, + boost::asio::error::get_system_category()); + + // Check if the operation completed immediately. + if (!result && last_error != WSA_IO_PENDING) { - // Post this handler so that operation will be restarted again. - ptr.get()->on_immediate_completion(last_error, 0); - ptr.release(); - return; + if (last_error == ERROR_NETNAME_DELETED + || last_error == WSAECONNABORTED) + { + // Post this handler so that operation will be restarted again. + o->iocp_service_.work_started(); + o->iocp_service_.on_completion(o, ec); + ptr.release(); + return; + } + else + { + // Operation already complete. Continue with rest of this + // handler. + } } else { - // Operation already complete. Continue with rest of this handler. + // Asynchronous operation has been successfully restarted. + o->iocp_service_.work_started(); + o->iocp_service_.on_pending(o); + ptr.release(); + return; } } + } + + // Get the address of the peer. + endpoint_type peer_endpoint; + if (!ec) + { + LPSOCKADDR local_addr = 0; + int local_addr_length = 0; + LPSOCKADDR remote_addr = 0; + int remote_addr_length = 0; + GetAcceptExSockaddrs(o->output_buffer(), 0, o->address_length(), + o->address_length(), &local_addr, &local_addr_length, + &remote_addr, &remote_addr_length); + if (static_cast<std::size_t>(remote_addr_length) + > peer_endpoint.capacity()) + { + ec = boost::asio::error::invalid_argument; + } else { - // Asynchronous operation has been successfully restarted. - ptr.get()->on_pending(); - ptr.release(); - return; + using namespace std; // For memcpy. + memcpy(peer_endpoint.data(), remote_addr, remote_addr_length); + peer_endpoint.resize(static_cast<std::size_t>(remote_addr_length)); } } - } - // Get the address of the peer. - endpoint_type peer_endpoint; - if (last_error == 0) - { - LPSOCKADDR local_addr = 0; - int local_addr_length = 0; - LPSOCKADDR remote_addr = 0; - int remote_addr_length = 0; - GetAcceptExSockaddrs(handler_op->output_buffer(), 0, - handler_op->address_length(), handler_op->address_length(), - &local_addr, &local_addr_length, &remote_addr, &remote_addr_length); - if (static_cast<std::size_t>(remote_addr_length) - > peer_endpoint.capacity()) - { - last_error = WSAEINVAL; - } - else + // Need to set the SO_UPDATE_ACCEPT_CONTEXT option so that getsockname + // and getpeername will work on the accepted socket. + if (!ec) { - using namespace std; // For memcpy. - memcpy(peer_endpoint.data(), remote_addr, remote_addr_length); - peer_endpoint.resize(static_cast<std::size_t>(remote_addr_length)); + SOCKET update_ctx_param = o->socket_; + socket_ops::setsockopt(o->new_socket_.get(), + SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + &update_ctx_param, sizeof(SOCKET), ec); } - } - // Need to set the SO_UPDATE_ACCEPT_CONTEXT option so that getsockname - // and getpeername will work on the accepted socket. - if (last_error == 0) - { - SOCKET update_ctx_param = handler_op->socket_; - boost::system::error_code ec; - if (socket_ops::setsockopt(handler_op->new_socket_.get(), - SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, - &update_ctx_param, sizeof(SOCKET), ec) != 0) + // If the socket was successfully accepted, transfer ownership of the + // socket to the peer object. + if (!ec) { - last_error = ec.value(); + o->peer_.assign(o->protocol_, + native_type(o->new_socket_.get(), peer_endpoint), ec); + if (!ec) + o->new_socket_.release(); } - } - // If the socket was successfully accepted, transfer ownership of the - // socket to the peer object. - if (last_error == 0) - { - boost::system::error_code ec; - handler_op->peer_.assign(handler_op->protocol_, - native_type(handler_op->new_socket_.get(), peer_endpoint), ec); - if (ec) - last_error = ec.value(); - else - handler_op->new_socket_.release(); + // Pass endpoint back to caller. + if (o->peer_endpoint_) + *o->peer_endpoint_ = peer_endpoint; + + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder1<Handler, boost::system::error_code> + handler(o->handler_, ec); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } - - // Pass endpoint back to caller. - if (handler_op->peer_endpoint_) - *handler_op->peer_endpoint_ = peer_endpoint; - - // Make a copy of the handler so that the memory can be deallocated before - // the upcall is made. - Handler handler(handler_op->handler_); - - // Free the memory associated with the handler. - ptr.reset(); - - // Call the handler. - boost::system::error_code ec(last_error, - boost::asio::error::get_system_category()); - boost_asio_handler_invoke_helpers::invoke( - detail::bind_handler(handler, ec), handler); - } - - static void destroy_impl(operation* op) - { - // Take ownership of the operation object. - typedef accept_operation<Socket, Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); - - // A sub-object of the handler may be the true owner of the memory - // associated with the handler. Consequently, a local copy of the handler - // is required to ensure that any owning sub-object remains valid until - // after we have deallocated the memory here. - Handler handler(handler_op->handler_); - (void)handler; - - // Free the memory associated with the handler. - ptr.reset(); } - win_iocp_io_service& io_service_; + private: + win_iocp_io_service& iocp_service_; socket_type socket_; socket_holder new_socket_; Socket& peer_; protocol_type protocol_; endpoint_type* peer_endpoint_; - boost::asio::io_service::work work_; unsigned char output_buffer_[(sizeof(sockaddr_storage_type) + 16) * 2]; bool enable_connection_aborted_; Handler handler_; @@ -2061,86 +1569,18 @@ public: void async_accept(implementation_type& impl, Socket& peer, endpoint_type* peer_endpoint, Handler handler) { - // Check whether acceptor has been initialised. - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor)); - return; - } - - // Check that peer socket has not already been opened. - if (peer.is_open()) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::already_open)); - return; - } - -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) - - // Create a new socket for the connection. - boost::system::error_code ec; - socket_holder sock(socket_ops::socket(impl.protocol_.family(), - impl.protocol_.type(), impl.protocol_.protocol(), ec)); - if (sock.get() == invalid_socket) - { - this->get_io_service().post(bind_handler(handler, ec)); - return; - } - // Allocate and construct an operation to wrap the handler. - typedef accept_operation<Socket, Handler> value_type; + typedef accept_op<Socket, Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); - socket_type new_socket = sock.get(); bool enable_connection_aborted = (impl.flags_ & implementation_type::enable_connection_aborted); - handler_ptr<alloc_traits> ptr(raw_ptr, - iocp_service_, impl.socket_, new_socket, peer, impl.protocol_, - peer_endpoint, enable_connection_aborted, handler); - sock.release(); - - // Accept a connection. - DWORD bytes_read = 0; - BOOL result = ::AcceptEx(impl.socket_, ptr.get()->new_socket(), - ptr.get()->output_buffer(), 0, ptr.get()->address_length(), - ptr.get()->address_length(), &bytes_read, ptr.get()); - DWORD last_error = ::WSAGetLastError(); - - // Check if the operation completed immediately. - if (!result && last_error != WSA_IO_PENDING) - { - if (!enable_connection_aborted - && (last_error == ERROR_NETNAME_DELETED - || last_error == WSAECONNABORTED)) - { - // Post handler so that operation will be restarted again. We do not - // perform the AcceptEx again here to avoid the possibility of starving - // other handlers. - ptr.get()->on_immediate_completion(last_error, 0); - ptr.release(); - } - else - { - boost::asio::io_service::work work(this->get_io_service()); - ptr.reset(); - boost::system::error_code ec(last_error, - boost::asio::error::get_system_category()); - iocp_service_.post(bind_handler(handler, ec)); - } - } - else - { - ptr.get()->on_pending(); - ptr.release(); - } + handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, impl.socket_, peer, + impl.protocol_, peer_endpoint, enable_connection_aborted, handler); + + start_accept_op(impl, peer.is_open(), ptr.get()->new_socket(), + ptr.get()->output_buffer(), ptr.get()->address_length(), ptr.get()); + ptr.release(); } // Connect the socket to the specified endpoint. @@ -2159,67 +1599,76 @@ public: return ec; } - template <typename Handler> - class connect_operation + class connect_op_base : public reactor_op { public: - connect_operation(socket_type socket, bool user_set_non_blocking, - boost::asio::io_service& io_service, Handler handler) - : socket_(socket), - user_set_non_blocking_(user_set_non_blocking), - io_service_(io_service), - work_(io_service), - handler_(handler) + connect_op_base(socket_type socket, func_type complete_func) + : reactor_op(&connect_op_base::do_perform, complete_func), + socket_(socket) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - bytes_transferred = 0; - - // Check whether the operation was successful. - if (ec) - return true; + connect_op_base* o(static_cast<connect_op_base*>(base)); // Get the error code from the connect operation. int connect_error = 0; size_t connect_error_len = sizeof(connect_error); - if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR, - &connect_error, &connect_error_len, ec) == socket_error_retval) + if (socket_ops::getsockopt(o->socket_, SOL_SOCKET, SO_ERROR, + &connect_error, &connect_error_len, o->ec_) == socket_error_retval) return true; - // If connection failed then post the handler with the error code. + // The connection failed so the handler will be posted with an error code. if (connect_error) { - ec = boost::system::error_code(connect_error, + o->ec_ = boost::system::error_code(connect_error, boost::asio::error::get_system_category()); - return true; } - // Revert socket to blocking mode unless the user requested otherwise. - if (!user_set_non_blocking_) - { - ioctl_arg_type non_blocking = 0; - if (socket_ops::ioctl(socket_, FIONBIO, &non_blocking, ec)) - return true; - } - - // Post the result of the successful connection operation. - ec = boost::system::error_code(); return true; } - void complete(const boost::system::error_code& ec, std::size_t) + private: + socket_type socket_; + }; + + template <typename Handler> + class connect_op : public connect_op_base + { + public: + connect_op(socket_type socket, Handler handler) + : connect_op_base(socket, &connect_op::do_complete), + handler_(handler) { - io_service_.post(bind_handler(handler_, ec)); + } + + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) + { + // Take ownership of the handler object. + connect_op* o(static_cast<connect_op*>(base)); + typedef handler_alloc_traits<Handler, connect_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); + + // Make the upcall if required. + if (owner) + { + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder1<Handler, boost::system::error_code> + handler(o->handler_, o->ec_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } private: - socket_type socket_; - bool user_set_non_blocking_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; Handler handler_; }; @@ -2228,86 +1677,216 @@ public: void async_connect(implementation_type& impl, const endpoint_type& peer_endpoint, Handler handler) { + // Allocate and construct an operation to wrap the handler. + typedef connect_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, handler); + + start_connect_op(impl, ptr.get(), peer_endpoint); + ptr.release(); + } + +private: + // Helper function to start an asynchronous send operation. + void start_send_op(implementation_type& impl, WSABUF* buffers, + std::size_t buffer_count, socket_base::message_flags flags, + bool noop, operation* op) + { + update_cancellation_thread_id(impl); + iocp_service_.work_started(); + + if (noop) + iocp_service_.on_completion(op); + else if (!is_open(impl)) + iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); + else + { + DWORD bytes_transferred = 0; + int result = ::WSASend(impl.socket_, buffers, + buffer_count, &bytes_transferred, flags, op, 0); + DWORD last_error = ::WSAGetLastError(); + if (last_error == ERROR_PORT_UNREACHABLE) + last_error = WSAECONNREFUSED; + if (result != 0 && last_error != WSA_IO_PENDING) + iocp_service_.on_completion(op, last_error, bytes_transferred); + else + iocp_service_.on_pending(op); + } + } + + // Helper function to start an asynchronous send_to operation. + void start_send_to_op(implementation_type& impl, WSABUF* buffers, + std::size_t buffer_count, const endpoint_type& destination, + socket_base::message_flags flags, operation* op) + { + update_cancellation_thread_id(impl); + iocp_service_.work_started(); + if (!is_open(impl)) + iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); + else { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor)); - return; + DWORD bytes_transferred = 0; + int result = ::WSASendTo(impl.socket_, buffers, buffer_count, + &bytes_transferred, flags, destination.data(), + static_cast<int>(destination.size()), op, 0); + DWORD last_error = ::WSAGetLastError(); + if (last_error == ERROR_PORT_UNREACHABLE) + last_error = WSAECONNREFUSED; + if (result != 0 && last_error != WSA_IO_PENDING) + iocp_service_.on_completion(op, last_error, bytes_transferred); + else + iocp_service_.on_pending(op); } + } -#if defined(BOOST_ASIO_ENABLE_CANCELIO) - // Update the ID of the thread from which cancellation is safe. - if (impl.safe_cancellation_thread_id_ == 0) - impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); - else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) - impl.safe_cancellation_thread_id_ = ~DWORD(0); -#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) + // Helper function to start an asynchronous receive operation. + void start_receive_op(implementation_type& impl, WSABUF* buffers, + std::size_t buffer_count, socket_base::message_flags flags, + bool noop, operation* op) + { + update_cancellation_thread_id(impl); + iocp_service_.work_started(); - // Check if the reactor was already obtained from the io_service. - reactor_type* reactor = static_cast<reactor_type*>( - interlocked_compare_exchange_pointer( - reinterpret_cast<void**>(&reactor_), 0, 0)); - if (!reactor) + if (noop) + iocp_service_.on_completion(op); + else if (!is_open(impl)) + iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); + else { - reactor = &(boost::asio::use_service<reactor_type>( - this->get_io_service())); - interlocked_exchange_pointer( - reinterpret_cast<void**>(&reactor_), reactor); + DWORD bytes_transferred = 0; + DWORD recv_flags = flags; + int result = ::WSARecv(impl.socket_, buffers, buffer_count, + &bytes_transferred, &recv_flags, op, 0); + DWORD last_error = ::WSAGetLastError(); + if (last_error == ERROR_NETNAME_DELETED) + last_error = WSAECONNRESET; + else if (last_error == ERROR_PORT_UNREACHABLE) + last_error = WSAECONNREFUSED; + if (result != 0 && last_error != WSA_IO_PENDING) + iocp_service_.on_completion(op, last_error, bytes_transferred); + else + iocp_service_.on_pending(op); } + } - // Mark the socket as non-blocking so that the connection will take place - // asynchronously. - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec)) + // Helper function to start an asynchronous receive_from operation. + void start_receive_from_op(implementation_type& impl, WSABUF* buffers, + std::size_t buffer_count, endpoint_type& sender_endpoint, + socket_base::message_flags flags, int* endpoint_size, operation* op) + { + update_cancellation_thread_id(impl); + iocp_service_.work_started(); + + if (!is_open(impl)) + iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); + else { - this->get_io_service().post(bind_handler(handler, ec)); - return; + DWORD bytes_transferred = 0; + DWORD recv_flags = flags; + int result = ::WSARecvFrom(impl.socket_, buffers, + buffer_count, &bytes_transferred, &recv_flags, + sender_endpoint.data(), endpoint_size, op, 0); + DWORD last_error = ::WSAGetLastError(); + if (last_error == ERROR_PORT_UNREACHABLE) + last_error = WSAECONNREFUSED; + if (result != 0 && last_error != WSA_IO_PENDING) + iocp_service_.on_completion(op, last_error, bytes_transferred); + else + iocp_service_.on_pending(op); } + } - // Start the connect operation. - if (socket_ops::connect(impl.socket_, peer_endpoint.data(), - peer_endpoint.size(), ec) == 0) + // Helper function to start an asynchronous receive_from operation. + void start_accept_op(implementation_type& impl, + bool peer_is_open, socket_holder& new_socket, + void* output_buffer, DWORD address_length, operation* op) + { + update_cancellation_thread_id(impl); + iocp_service_.work_started(); + + if (!is_open(impl)) + iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); + else if (peer_is_open) + iocp_service_.on_completion(op, boost::asio::error::already_open); + else { - // Revert socket to blocking mode unless the user requested otherwise. - if (!(impl.flags_ & implementation_type::user_set_non_blocking)) + boost::system::error_code ec; + new_socket.reset(socket_ops::socket(impl.protocol_.family(), + impl.protocol_.type(), impl.protocol_.protocol(), ec)); + if (new_socket.get() == invalid_socket) + iocp_service_.on_completion(op, ec); + else { - non_blocking = 0; - socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec); + DWORD bytes_read = 0; + BOOL result = ::AcceptEx(impl.socket_, new_socket.get(), output_buffer, + 0, address_length, address_length, &bytes_read, op); + DWORD last_error = ::WSAGetLastError(); + if (!result && last_error != WSA_IO_PENDING) + iocp_service_.on_completion(op, last_error); + else + iocp_service_.on_pending(op); } - - // The connect operation has finished successfully so we need to post the - // handler immediately. - this->get_io_service().post(bind_handler(handler, ec)); } - else if (ec == boost::asio::error::in_progress - || ec == boost::asio::error::would_block) + } + + // Start an asynchronous read or write operation using the the reactor. + void start_reactor_op(implementation_type& impl, int op_type, reactor_op* op) + { + reactor& r = get_reactor(); + update_cancellation_thread_id(impl); + + if (is_open(impl)) { - // The connection is happening in the background, and we need to wait - // until the socket becomes writeable. - boost::shared_ptr<bool> completed(new bool(false)); - reactor->start_connect_op(impl.socket_, impl.reactor_data_, - connect_operation<Handler>( - impl.socket_, - (impl.flags_ & implementation_type::user_set_non_blocking) != 0, - this->get_io_service(), handler)); + r.start_op(op_type, impl.socket_, impl.reactor_data_, op, false); + return; } else + op->ec_ = boost::asio::error::bad_descriptor; + + iocp_service_.post_immediate_completion(op); + } + + // Start the asynchronous connect operation using the reactor. + void start_connect_op(implementation_type& impl, + reactor_op* op, const endpoint_type& peer_endpoint) + { + reactor& r = get_reactor(); + update_cancellation_thread_id(impl); + + if (is_open(impl)) { - // Revert socket to blocking mode unless the user requested otherwise. - if (!(impl.flags_ & implementation_type::user_set_non_blocking)) + ioctl_arg_type non_blocking = 1; + if (!socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, op->ec_)) { - non_blocking = 0; - boost::system::error_code ignored_ec; - socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec); - } + if (socket_ops::connect(impl.socket_, peer_endpoint.data(), + peer_endpoint.size(), op->ec_) != 0) + { + if (!op->ec_ + && !(impl.flags_ & implementation_type::user_set_non_blocking)) + { + non_blocking = 0; + socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, op->ec_); + } - // The connect operation has failed, so post the handler immediately. - this->get_io_service().post(bind_handler(handler, ec)); + if (op->ec_ == boost::asio::error::in_progress + || op->ec_ == boost::asio::error::would_block) + { + op->ec_ = boost::system::error_code(); + r.start_op(reactor::connect_op, impl.socket_, + impl.reactor_data_, op, true); + return; + } + } + } } + else + op->ec_ = boost::asio::error::bad_descriptor; + + iocp_service_.post_immediate_completion(op); } -private: // Helper function to close a socket when the associated object is being // destroyed. void close_for_destruction(implementation_type& impl) @@ -2317,11 +1896,11 @@ private: // Check if the reactor was created, in which case we need to close the // socket on the reactor as well to cancel any operations that might be // running there. - reactor_type* reactor = static_cast<reactor_type*>( + reactor* r = static_cast<reactor*>( interlocked_compare_exchange_pointer( reinterpret_cast<void**>(&reactor_), 0, 0)); - if (reactor) - reactor->close_descriptor(impl.socket_, impl.reactor_data_); + if (r) + r->close_descriptor(impl.socket_, impl.reactor_data_); // The socket destructor must not block. If the user has changed the // linger option to block in the foreground, we will change it back to the @@ -2347,6 +1926,35 @@ private: } } + // Update the ID of the thread from which cancellation is safe. + void update_cancellation_thread_id(implementation_type& impl) + { +#if defined(BOOST_ASIO_ENABLE_CANCELIO) + if (impl.safe_cancellation_thread_id_ == 0) + impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId(); + else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId()) + impl.safe_cancellation_thread_id_ = ~DWORD(0); +#else // defined(BOOST_ASIO_ENABLE_CANCELIO) + (void)impl; +#endif // defined(BOOST_ASIO_ENABLE_CANCELIO) + } + + // Helper function to get the reactor. If no reactor has been created yet, a + // new one is obtained from the io_service and a pointer to it is cached in + // this service. + reactor& get_reactor() + { + reactor* r = static_cast<reactor*>( + interlocked_compare_exchange_pointer( + reinterpret_cast<void**>(&reactor_), 0, 0)); + if (!r) + { + r = &(use_service<reactor>(io_service_)); + interlocked_exchange_pointer(reinterpret_cast<void**>(&reactor_), r); + } + return *r; + } + // Helper function to emulate InterlockedCompareExchangePointer functionality // for: // - very old Platform SDKs; and @@ -2375,13 +1983,16 @@ private: #endif } + // The io_service used to obtain the reactor, if required. + boost::asio::io_service& io_service_; + // The IOCP service used for running asynchronous operations and dispatching // handlers. win_iocp_io_service& iocp_service_; // The reactor used for performing connect operations. This object is created // only if needed. - reactor_type* reactor_; + reactor* reactor_; // Mutex to protect access to the linked list of implementations. boost::asio::detail::mutex mutex_; |