diff options
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/reactive_descriptor_service.hpp')
-rw-r--r-- | 3rdParty/Boost/src/boost/asio/detail/reactive_descriptor_service.hpp | 510 |
1 files changed, 227 insertions, 283 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/reactive_descriptor_service.hpp b/3rdParty/Boost/src/boost/asio/detail/reactive_descriptor_service.hpp index 61f676b..536b96a 100644 --- a/3rdParty/Boost/src/boost/asio/detail/reactive_descriptor_service.hpp +++ b/3rdParty/Boost/src/boost/asio/detail/reactive_descriptor_service.hpp @@ -21,10 +21,13 @@ #include <boost/asio/error.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/detail/bind_handler.hpp> -#include <boost/asio/detail/handler_base_from_member.hpp> -#include <boost/asio/detail/noncopyable.hpp> -#include <boost/asio/detail/service_base.hpp> +#include <boost/asio/detail/buffer_sequence_adapter.hpp> #include <boost/asio/detail/descriptor_ops.hpp> +#include <boost/asio/detail/fenced_block.hpp> +#include <boost/asio/detail/noncopyable.hpp> +#include <boost/asio/detail/null_buffers_op.hpp> +#include <boost/asio/detail/reactor.hpp> +#include <boost/asio/detail/reactor_op.hpp> #if !defined(BOOST_WINDOWS) && !defined(__CYGWIN__) @@ -32,10 +35,7 @@ namespace boost { namespace asio { namespace detail { -template <typename Reactor> class reactive_descriptor_service - : public boost::asio::detail::service_base< - reactive_descriptor_service<Reactor> > { public: // The native type of a descriptor. @@ -55,22 +55,28 @@ public: private: // Only this service will have access to the internal values. - friend class reactive_descriptor_service<Reactor>; + friend class reactive_descriptor_service; // The native descriptor representation. int descriptor_; enum { - user_set_non_blocking = 1, // The user wants a non-blocking descriptor. - internal_non_blocking = 2 // The descriptor has been set non-blocking. + // The user wants a non-blocking descriptor. + user_set_non_blocking = 1, + + // The descriptor has been set non-blocking. + internal_non_blocking = 2, + + // Helper "flag" used to determine whether the descriptor is non-blocking. + non_blocking = user_set_non_blocking | internal_non_blocking }; // Flags indicating the current state of the descriptor. unsigned char flags_; // Per-descriptor data used by the reactor. - typename Reactor::per_descriptor_data reactor_data_; + reactor::per_descriptor_data reactor_data_; }; // The maximum number of buffers to support in a single operation. @@ -78,9 +84,8 @@ public: // Constructor. reactive_descriptor_service(boost::asio::io_service& io_service) - : boost::asio::detail::service_base< - reactive_descriptor_service<Reactor> >(io_service), - reactor_(boost::asio::use_service<Reactor>(io_service)) + : io_service_impl_(boost::asio::use_service<io_service_impl>(io_service)), + reactor_(boost::asio::use_service<reactor>(io_service)) { reactor_.init_task(); } @@ -209,19 +214,31 @@ public: return ec; } - if (command.name() == static_cast<int>(FIONBIO)) + descriptor_ops::ioctl(impl.descriptor_, command.name(), + static_cast<ioctl_arg_type*>(command.data()), ec); + + // When updating the non-blocking mode we always perform the ioctl syscall, + // even if the flags would otherwise indicate that the descriptor is + // already in the correct state. This ensures that the underlying + // descriptor is put into the state that has been requested by the user. If + // the ioctl syscall was successful then we need to update the flags to + // match. + if (!ec && command.name() == static_cast<int>(FIONBIO)) { - if (command.get()) + if (*static_cast<ioctl_arg_type*>(command.data())) + { impl.flags_ |= implementation_type::user_set_non_blocking; + } else - impl.flags_ &= ~implementation_type::user_set_non_blocking; - ec = boost::system::error_code(); - } - else - { - descriptor_ops::ioctl(impl.descriptor_, command.name(), - static_cast<ioctl_arg_type*>(command.data()), ec); + { + // Clearing the non-blocking mode always overrides any internally-set + // non-blocking flag. Any subsequent asynchronous operations will need + // to re-enable non-blocking I/O. + impl.flags_ &= ~(implementation_type::user_set_non_blocking + | implementation_type::internal_non_blocking); + } } + return ec; } @@ -236,47 +253,22 @@ public: return 0; } - // Copy buffers into array. - descriptor_ops::buf bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - descriptor_ops::init_buf(bufs[i], - boost::asio::buffer_cast<const void*>(buffer), - boost::asio::buffer_size(buffer)); - total_buffer_size += boost::asio::buffer_size(buffer); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(buffers); // A request to read_some 0 bytes on a stream is a no-op. - if (total_buffer_size == 0) + if (bufs.all_empty()) { ec = boost::system::error_code(); return 0; } - // Make descriptor non-blocking if user wants non-blocking. - if (impl.flags_ & implementation_type::user_set_non_blocking) - { - if (!(impl.flags_ & implementation_type::internal_non_blocking)) - { - ioctl_arg_type non_blocking = 1; - if (descriptor_ops::ioctl(impl.descriptor_, - FIONBIO, &non_blocking, ec)) - return 0; - impl.flags_ |= implementation_type::internal_non_blocking; - } - } - // Send the data. for (;;) { // Try to complete the operation without blocking. int bytes_sent = descriptor_ops::gather_write( - impl.descriptor_, bufs, i, ec); + impl.descriptor_, bufs.buffers(), bufs.count(), ec); // Check if operation succeeded. if (bytes_sent >= 0) @@ -310,48 +302,31 @@ public: return 0; } - template <typename ConstBufferSequence, typename Handler> - class write_operation : - public handler_base_from_member<Handler> + template <typename ConstBufferSequence> + class write_op_base : public reactor_op { public: - write_operation(int descriptor, boost::asio::io_service& io_service, - const ConstBufferSequence& buffers, Handler handler) - : handler_base_from_member<Handler>(handler), + write_op_base(int descriptor, + const ConstBufferSequence& buffers, func_type complete_func) + : reactor_op(&write_op_base::do_perform, complete_func), descriptor_(descriptor), - io_service_(io_service), - work_(io_service), buffers_(buffers) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - { - bytes_transferred = 0; - return true; - } + write_op_base* o(static_cast<write_op_base*>(base)); - // Copy buffers into array. - descriptor_ops::buf bufs[max_buffers]; - typename ConstBufferSequence::const_iterator iter = buffers_.begin(); - typename ConstBufferSequence::const_iterator end = buffers_.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - descriptor_ops::init_buf(bufs[i], - boost::asio::buffer_cast<const void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence> bufs(o->buffers_); for (;;) { // Write the data. - int bytes = descriptor_ops::gather_write(descriptor_, bufs, i, ec); + boost::system::error_code ec; + int bytes = descriptor_ops::gather_write( + o->descriptor_, bufs.buffers(), bufs.count(), ec); // Retry operation if interrupted by signal. if (ec == boost::asio::error::interrupted) @@ -362,120 +337,89 @@ public: || ec == boost::asio::error::try_again) return false; - bytes_transferred = (bytes < 0 ? 0 : bytes); + o->ec_ = ec; + o->bytes_transferred_ = (bytes < 0 ? 0 : bytes); return true; } } - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); - } - private: int descriptor_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; ConstBufferSequence buffers_; }; - // Start an asynchronous write. The data being sent must be valid for the - // lifetime of the asynchronous operation. template <typename ConstBufferSequence, typename Handler> - void async_write_some(implementation_type& impl, - const ConstBufferSequence& buffers, Handler handler) - { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - // Determine total size of buffers. - typename ConstBufferSequence::const_iterator iter = buffers.begin(); - typename ConstBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::const_buffer buffer(*iter); - total_buffer_size += boost::asio::buffer_size(buffer); - } - - // A request to read_some 0 bytes on a stream is a no-op. - if (total_buffer_size == 0) - { - this->get_io_service().post(bind_handler(handler, - boost::system::error_code(), 0)); - return; - } - - // Make descriptor non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec, 0)); - return; - } - impl.flags_ |= implementation_type::internal_non_blocking; - } - - reactor_.start_write_op(impl.descriptor_, impl.reactor_data_, - write_operation<ConstBufferSequence, Handler>( - impl.descriptor_, this->get_io_service(), buffers, handler)); - } - } - - template <typename Handler> - class null_buffers_operation : - public handler_base_from_member<Handler> + class write_op : public write_op_base<ConstBufferSequence> { public: - null_buffers_operation(boost::asio::io_service& io_service, Handler handler) - : handler_base_from_member<Handler>(handler), - work_(io_service) + write_op(int descriptor, + const ConstBufferSequence& buffers, Handler handler) + : write_op_base<ConstBufferSequence>( + descriptor, buffers, &write_op::do_complete), + handler_(handler) { } - bool perform(boost::system::error_code&, - std::size_t& bytes_transferred) + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) { - bytes_transferred = 0; - return true; - } + // Take ownership of the handler object. + write_op* o(static_cast<write_op*>(base)); + typedef handler_alloc_traits<Handler, write_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - work_.get_io_service().post(bind_handler( - this->handler_, ec, bytes_transferred)); + // Make the upcall if required. + if (owner) + { + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, o->ec_, o->bytes_transferred_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); + } } private: - boost::asio::io_service::work work_; + Handler handler_; }; + // Start an asynchronous write. The data being sent must be valid for the + // lifetime of the asynchronous operation. + template <typename ConstBufferSequence, typename Handler> + void async_write_some(implementation_type& impl, + const ConstBufferSequence& buffers, Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef write_op<ConstBufferSequence, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, impl.descriptor_, buffers, handler); + + start_op(impl, reactor::write_op, ptr.get(), true, + buffer_sequence_adapter<boost::asio::const_buffer, + ConstBufferSequence>::all_empty(buffers)); + ptr.release(); + } + // Start an asynchronous wait until data can be written without blocking. template <typename Handler> void async_write_some(implementation_type& impl, const null_buffers&, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else - { - reactor_.start_write_op(impl.descriptor_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); - } + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); + + start_op(impl, reactor::write_op, ptr.get(), false, false); + ptr.release(); } // Read some data from the stream. Returns the number of bytes read. @@ -489,46 +433,22 @@ public: return 0; } - // Copy buffers into array. - descriptor_ops::buf bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - descriptor_ops::init_buf(bufs[i], - boost::asio::buffer_cast<void*>(buffer), - boost::asio::buffer_size(buffer)); - total_buffer_size += boost::asio::buffer_size(buffer); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(buffers); // A request to read_some 0 bytes on a stream is a no-op. - if (total_buffer_size == 0) + if (bufs.all_empty()) { ec = boost::system::error_code(); return 0; } - // Make descriptor non-blocking if user wants non-blocking. - if (impl.flags_ & implementation_type::user_set_non_blocking) - { - if (!(impl.flags_ & implementation_type::internal_non_blocking)) - { - ioctl_arg_type non_blocking = 1; - if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) - return 0; - impl.flags_ |= implementation_type::internal_non_blocking; - } - } - // Read some data. for (;;) { // Try to complete the operation without blocking. int bytes_read = descriptor_ops::scatter_read( - impl.descriptor_, bufs, i, ec); + impl.descriptor_, bufs.buffers(), bufs.count(), ec); // Check if operation succeeded. if (bytes_read > 0) @@ -569,48 +489,31 @@ public: return 0; } - template <typename MutableBufferSequence, typename Handler> - class read_operation : - public handler_base_from_member<Handler> + template <typename MutableBufferSequence> + class read_op_base : public reactor_op { public: - read_operation(int descriptor, boost::asio::io_service& io_service, - const MutableBufferSequence& buffers, Handler handler) - : handler_base_from_member<Handler>(handler), + read_op_base(int descriptor, + const MutableBufferSequence& buffers, func_type complete_func) + : reactor_op(&read_op_base::do_perform, complete_func), descriptor_(descriptor), - io_service_(io_service), - work_(io_service), buffers_(buffers) { } - bool perform(boost::system::error_code& ec, - std::size_t& bytes_transferred) + static bool do_perform(reactor_op* base) { - // Check whether the operation was successful. - if (ec) - { - bytes_transferred = 0; - return true; - } + read_op_base* o(static_cast<read_op_base*>(base)); - // Copy buffers into array. - descriptor_ops::buf bufs[max_buffers]; - typename MutableBufferSequence::const_iterator iter = buffers_.begin(); - typename MutableBufferSequence::const_iterator end = buffers_.end(); - size_t i = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - descriptor_ops::init_buf(bufs[i], - boost::asio::buffer_cast<void*>(buffer), - boost::asio::buffer_size(buffer)); - } + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence> bufs(o->buffers_); for (;;) { // Read some data. - int bytes = descriptor_ops::scatter_read(descriptor_, bufs, i, ec); + boost::system::error_code ec; + int bytes = descriptor_ops::scatter_read( + o->descriptor_, bufs.buffers(), bufs.count(), ec); if (bytes == 0) ec = boost::asio::error::eof; @@ -623,73 +526,75 @@ public: || ec == boost::asio::error::try_again) return false; - bytes_transferred = (bytes < 0 ? 0 : bytes); + o->ec_ = ec; + o->bytes_transferred_ = (bytes < 0 ? 0 : bytes); return true; } } - void complete(const boost::system::error_code& ec, - std::size_t bytes_transferred) - { - io_service_.post(bind_handler(this->handler_, ec, bytes_transferred)); - } - private: int descriptor_; - boost::asio::io_service& io_service_; - boost::asio::io_service::work work_; MutableBufferSequence buffers_; }; - // Start an asynchronous read. The buffer for the data being read must be - // valid for the lifetime of the asynchronous operation. template <typename MutableBufferSequence, typename Handler> - void async_read_some(implementation_type& impl, - const MutableBufferSequence& buffers, Handler handler) + class read_op : public read_op_base<MutableBufferSequence> { - if (!is_open(impl)) + public: + read_op(int descriptor, + const MutableBufferSequence& buffers, Handler handler) + : read_op_base<MutableBufferSequence>( + descriptor, buffers, &read_op::do_complete), + handler_(handler) { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); } - else + + static void do_complete(io_service_impl* owner, operation* base, + boost::system::error_code /*ec*/, std::size_t /*bytes_transferred*/) { - // Determine total size of buffers. - typename MutableBufferSequence::const_iterator iter = buffers.begin(); - typename MutableBufferSequence::const_iterator end = buffers.end(); - size_t i = 0; - size_t total_buffer_size = 0; - for (; iter != end && i < max_buffers; ++iter, ++i) - { - boost::asio::mutable_buffer buffer(*iter); - total_buffer_size += boost::asio::buffer_size(buffer); - } + // Take ownership of the handler object. + read_op* o(static_cast<read_op*>(base)); + typedef handler_alloc_traits<Handler, read_op> alloc_traits; + handler_ptr<alloc_traits> ptr(o->handler_, o); - // A request to read_some 0 bytes on a stream is a no-op. - if (total_buffer_size == 0) + // Make the upcall if required. + if (owner) { - this->get_io_service().post(bind_handler(handler, - boost::system::error_code(), 0)); - return; + // Make a copy of the handler so that the memory can be deallocated + // before the upcall is made. Even if we're not about to make an + // upcall, a sub-object of the handler may be the true owner of the + // memory associated with the handler. Consequently, a local copy of + // the handler is required to ensure that any owning sub-object remains + // valid until after we have deallocated the memory here. + detail::binder2<Handler, boost::system::error_code, std::size_t> + handler(o->handler_, o->ec_, o->bytes_transferred_); + ptr.reset(); + boost::asio::detail::fenced_block b; + boost_asio_handler_invoke_helpers::invoke(handler, handler); } + } - // Make descriptor non-blocking. - if (!(impl.flags_ & implementation_type::internal_non_blocking)) - { - ioctl_arg_type non_blocking = 1; - boost::system::error_code ec; - if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) - { - this->get_io_service().post(bind_handler(handler, ec, 0)); - return; - } - impl.flags_ |= implementation_type::internal_non_blocking; - } + private: + Handler handler_; + }; - reactor_.start_read_op(impl.descriptor_, impl.reactor_data_, - read_operation<MutableBufferSequence, Handler>( - impl.descriptor_, this->get_io_service(), buffers, handler)); - } + // Start an asynchronous read. The buffer for the data being read must be + // valid for the lifetime of the asynchronous operation. + template <typename MutableBufferSequence, typename Handler> + void async_read_some(implementation_type& impl, + const MutableBufferSequence& buffers, Handler handler) + { + // Allocate and construct an operation to wrap the handler. + typedef read_op<MutableBufferSequence, Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, + impl.descriptor_, buffers, handler); + + start_op(impl, reactor::read_op, ptr.get(), true, + buffer_sequence_adapter<boost::asio::mutable_buffer, + MutableBufferSequence>::all_empty(buffers)); + ptr.release(); } // Wait until data can be read without blocking. @@ -697,22 +602,61 @@ public: void async_read_some(implementation_type& impl, const null_buffers&, Handler handler) { - if (!is_open(impl)) - { - this->get_io_service().post(bind_handler(handler, - boost::asio::error::bad_descriptor, 0)); - } - else + // Allocate and construct an operation to wrap the handler. + typedef null_buffers_op<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); + + start_op(impl, reactor::read_op, ptr.get(), false, false); + ptr.release(); + } + +private: + // Start the asynchronous operation. + void start_op(implementation_type& impl, int op_type, + reactor_op* op, bool non_blocking, bool noop) + { + if (!noop) { - reactor_.start_read_op(impl.descriptor_, impl.reactor_data_, - null_buffers_operation<Handler>(this->get_io_service(), handler), - false); + if (is_open(impl)) + { + if (is_non_blocking(impl) || set_non_blocking(impl, op->ec_)) + { + reactor_.start_op(op_type, impl.descriptor_, + impl.reactor_data_, op, non_blocking); + return; + } + } + else + op->ec_ = boost::asio::error::bad_descriptor; } + + io_service_impl_.post_immediate_completion(op); } -private: + // Determine whether the descriptor has been set non-blocking. + bool is_non_blocking(implementation_type& impl) const + { + return (impl.flags_ & implementation_type::non_blocking); + } + + // Set the internal non-blocking flag. + bool set_non_blocking(implementation_type& impl, + boost::system::error_code& ec) + { + ioctl_arg_type non_blocking = 1; + if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec)) + return false; + impl.flags_ |= implementation_type::internal_non_blocking; + return true; + } + + // The io_service implementation used to post completions. + io_service_impl& io_service_impl_; + // The selector that performs event demultiplexing for the service. - Reactor& reactor_; + reactor& reactor_; }; } // namespace detail |