diff options
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp')
-rw-r--r-- | 3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp | 545 |
1 files changed, 230 insertions, 315 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp b/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp index 1dfb9a1..83b16bf 100644 --- a/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp +++ b/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp @@ -29,122 +29,37 @@ #include <boost/asio/io_service.hpp> #include <boost/asio/detail/call_stack.hpp> +#include <boost/asio/detail/completion_handler.hpp> +#include <boost/asio/detail/fenced_block.hpp> #include <boost/asio/detail/handler_alloc_helpers.hpp> #include <boost/asio/detail/handler_invoke_helpers.hpp> +#include <boost/asio/detail/mutex.hpp> +#include <boost/asio/detail/op_queue.hpp> #include <boost/asio/detail/service_base.hpp> #include <boost/asio/detail/socket_types.hpp> -#include <boost/asio/detail/timer_queue.hpp> -#include <boost/asio/detail/mutex.hpp> +#include <boost/asio/detail/timer_op.hpp> +#include <boost/asio/detail/timer_queue_base.hpp> +#include <boost/asio/detail/timer_queue_fwd.hpp> +#include <boost/asio/detail/timer_queue_set.hpp> +#include <boost/asio/detail/win_iocp_operation.hpp> namespace boost { namespace asio { namespace detail { +class timer_op; + class win_iocp_io_service : public boost::asio::detail::service_base<win_iocp_io_service> { public: - // Base class for all operations. A function pointer is used instead of - // virtual functions to avoid the associated overhead. - // - // This class inherits from OVERLAPPED so that we can downcast to get back to - // the operation pointer from the LPOVERLAPPED out parameter of - // GetQueuedCompletionStatus. - class operation; - friend class operation; - class operation - : public OVERLAPPED - { - public: - typedef void (*invoke_func_type)(operation*, DWORD, size_t); - typedef void (*destroy_func_type)(operation*); - - operation(win_iocp_io_service& iocp_service, - invoke_func_type invoke_func, destroy_func_type destroy_func) - : iocp_service_(iocp_service), - ready_(0), - last_error_(~DWORD(0)), - bytes_transferred_(0), - invoke_func_(invoke_func), - destroy_func_(destroy_func) - { - Internal = 0; - InternalHigh = 0; - Offset = 0; - OffsetHigh = 0; - hEvent = 0; - - ::InterlockedIncrement(&iocp_service_.outstanding_operations_); - } - - void reset() - { - Internal = 0; - InternalHigh = 0; - Offset = 0; - OffsetHigh = 0; - hEvent = 0; - ready_ = 0; - last_error_ = ~DWORD(0); - bytes_transferred_ = 0; - } - - void on_pending() - { - if (::InterlockedCompareExchange(&ready_, 1, 0) == 1) - iocp_service_.post_completion(this, last_error_, bytes_transferred_); - } - - void on_immediate_completion(DWORD last_error, DWORD bytes_transferred) - { - ready_ = 1; - iocp_service_.post_completion(this, last_error, bytes_transferred); - } - - bool on_completion(DWORD last_error, DWORD bytes_transferred) - { - if (last_error_ == ~DWORD(0)) - { - last_error_ = last_error; - bytes_transferred_ = bytes_transferred; - } - - if (::InterlockedCompareExchange(&ready_, 1, 0) == 1) - { - invoke_func_(this, last_error_, bytes_transferred_); - return true; - } - - return false; - } - - void destroy() - { - destroy_func_(this); - } - - protected: - // Prevent deletion through this type. - ~operation() - { - ::InterlockedDecrement(&iocp_service_.outstanding_operations_); - } - - private: - win_iocp_io_service& iocp_service_; - long ready_; - DWORD last_error_; - DWORD bytes_transferred_; - invoke_func_type invoke_func_; - destroy_func_type destroy_func_; - }; + typedef win_iocp_operation operation; // Constructor. win_iocp_io_service(boost::asio::io_service& io_service) : boost::asio::detail::service_base<win_iocp_io_service>(io_service), iocp_(), outstanding_work_(0), - outstanding_operations_(0), stopped_(0), shutdown_(0), timer_thread_(0), @@ -172,24 +87,34 @@ public: { ::InterlockedExchange(&shutdown_, 1); - while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0) + while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0) { - DWORD bytes_transferred = 0; -#if defined(WINVER) && (WINVER < 0x0500) - DWORD completion_key = 0; -#else - DWORD_PTR completion_key = 0; -#endif - LPOVERLAPPED overlapped = 0; - ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, - &completion_key, &overlapped, INFINITE); - if (overlapped) - static_cast<operation*>(overlapped)->destroy(); + op_queue<operation> ops; + timer_queues_.get_all_timers(ops); + ops.push(completed_ops_); + if (!ops.empty()) + { + while (operation* op = ops.front()) + { + ops.pop(); + ::InterlockedDecrement(&outstanding_work_); + op->destroy(); + } + } + else + { + DWORD bytes_transferred = 0; + dword_ptr_t completion_key = 0; + LPOVERLAPPED overlapped = 0; + ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, + &completion_key, &overlapped, max_timeout); + if (overlapped) + { + ::InterlockedDecrement(&outstanding_work_); + static_cast<operation*>(overlapped)->destroy(); + } + } } - - for (std::size_t i = 0; i < timer_queues_.size(); ++i) - timer_queues_[i]->destroy_timers(); - timer_queues_.clear(); } // Initialise the task. Nothing to do here. @@ -219,6 +144,7 @@ public: { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { + stop(); ec = boost::system::error_code(); return 0; } @@ -237,6 +163,7 @@ public: { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { + stop(); ec = boost::system::error_code(); return 0; } @@ -251,6 +178,7 @@ public: { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { + stop(); ec = boost::system::error_code(); return 0; } @@ -269,6 +197,7 @@ public: { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { + stop(); ec = boost::system::error_code(); return 0; } @@ -319,7 +248,10 @@ public: void dispatch(Handler handler) { if (call_stack<win_iocp_io_service>::contains(this)) + { + boost::asio::detail::fenced_block b; boost_asio_handler_invoke_helpers::invoke(handler, handler); + } else post(handler); } @@ -328,37 +260,128 @@ public: template <typename Handler> void post(Handler handler) { - // If the service has been shut down we silently discard the handler. - if (::InterlockedExchangeAdd(&shutdown_, 0) != 0) - return; - // Allocate and construct an operation to wrap the handler. - typedef handler_operation<Handler> value_type; + typedef completion_handler<Handler> value_type; typedef handler_alloc_traits<Handler, value_type> alloc_traits; raw_handler_ptr<alloc_traits> raw_ptr(handler); - handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); + + post_immediate_completion(ptr.get()); + ptr.release(); + } + + // Request invocation of the given operation and return immediately. Assumes + // that work_started() has not yet been called for the operation. + void post_immediate_completion(operation* op) + { + work_started(); + post_deferred_completion(op); + } + + // Request invocation of the given operation and return immediately. Assumes + // that work_started() was previously called for the operation. + void post_deferred_completion(operation* op) + { + // Flag the operation as ready. + op->ready_ = 1; // Enqueue the operation on the I/O completion port. - ptr.get()->on_immediate_completion(0, 0); + if (!::PostQueuedCompletionStatus(iocp_.handle, + 0, overlapped_contains_result, op)) + { + // Out of resources. Put on completed queue instead. + boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + completed_ops_.push(op); + } + } - // Operation has been successfully posted. - ptr.release(); + // Request invocation of the given operation and return immediately. Assumes + // that work_started() was previously called for the operations. + void post_deferred_completions(op_queue<operation>& ops) + { + while (operation* op = ops.front()) + { + ops.pop(); + + // Flag the operation as ready. + op->ready_ = 1; + + // Enqueue the operation on the I/O completion port. + if (!::PostQueuedCompletionStatus(iocp_.handle, + 0, overlapped_contains_result, op)) + { + // Out of resources. Put on completed queue instead. + boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + completed_ops_.push(op); + completed_ops_.push(ops); + } + } } - // Request invocation of the given OVERLAPPED-derived operation. - void post_completion(operation* op, DWORD op_last_error, - DWORD bytes_transferred) + // Called after starting an overlapped I/O operation that did not complete + // immediately. The caller must have already called work_started() prior to + // starting the operation. + void on_pending(operation* op) { + if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) + { + // Enqueue the operation on the I/O completion port. + if (!::PostQueuedCompletionStatus(iocp_.handle, + 0, overlapped_contains_result, op)) + { + // Out of resources. Put on completed queue instead. + boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + completed_ops_.push(op); + } + } + } + + // Called after starting an overlapped I/O operation that completed + // immediately. The caller must have already called work_started() prior to + // starting the operation. + void on_completion(operation* op, + DWORD last_error = 0, DWORD bytes_transferred = 0) + { + // Flag that the operation is ready for invocation. + op->ready_ = 1; + + // Store results in the OVERLAPPED structure. + op->Internal = reinterpret_cast<ulong_ptr_t>( + &boost::asio::error::get_system_category()); + op->Offset = last_error; + op->OffsetHigh = bytes_transferred; + // Enqueue the operation on the I/O completion port. if (!::PostQueuedCompletionStatus(iocp_.handle, - bytes_transferred, op_last_error, op)) + 0, overlapped_contains_result, op)) { - DWORD last_error = ::GetLastError(); - boost::system::system_error e( - boost::system::error_code(last_error, - boost::asio::error::get_system_category()), - "pqcs"); - boost::throw_exception(e); + // Out of resources. Put on completed queue instead. + boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + completed_ops_.push(op); + } + } + + // Called after starting an overlapped I/O operation that completed + // immediately. The caller must have already called work_started() prior to + // starting the operation. + void on_completion(operation* op, + const boost::system::error_code& ec, DWORD bytes_transferred = 0) + { + // Flag that the operation is ready for invocation. + op->ready_ = 1; + + // Store results in the OVERLAPPED structure. + op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category()); + op->Offset = ec.value(); + op->OffsetHigh = bytes_transferred; + + // Enqueue the operation on the I/O completion port. + if (!::PostQueuedCompletionStatus(iocp_.handle, + 0, overlapped_contains_result, op)) + { + // Out of resources. Put on completed queue instead. + boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + completed_ops_.push(op); } } @@ -367,7 +390,7 @@ public: void add_timer_queue(timer_queue<Time_Traits>& timer_queue) { boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); - timer_queues_.push_back(&timer_queue); + timer_queues_.insert(&timer_queue); } // Remove a timer queue from the service. @@ -375,36 +398,28 @@ public: void remove_timer_queue(timer_queue<Time_Traits>& timer_queue) { boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); - for (std::size_t i = 0; i < timer_queues_.size(); ++i) - { - if (timer_queues_[i] == &timer_queue) - { - timer_queues_.erase(timer_queues_.begin() + i); - return; - } - } + timer_queues_.erase(&timer_queue); } - // Schedule a timer in the given timer queue to expire at the specified - // absolute time. The handler object will be invoked when the timer expires. - template <typename Time_Traits, typename Handler> + // Schedule a new operation in the given timer queue to expire at the + // specified absolute time. + template <typename Time_Traits> void schedule_timer(timer_queue<Time_Traits>& timer_queue, - const typename Time_Traits::time_type& time, Handler handler, void* token) + const typename Time_Traits::time_type& time, timer_op* op, void* token) { // If the service has been shut down we silently discard the timer. if (::InterlockedExchangeAdd(&shutdown_, 0) != 0) return; boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); - if (timer_queue.enqueue_timer(time, handler, token)) + bool interrupt = timer_queue.enqueue_timer(time, op, token); + work_started(); + if (interrupt && !timer_interrupt_issued_) { - if (!timer_interrupt_issued_) - { - timer_interrupt_issued_ = true; - lock.unlock(); - ::PostQueuedCompletionStatus(iocp_.handle, - 0, steal_timer_dispatching, 0); - } + timer_interrupt_issued_ = true; + lock.unlock(); + ::PostQueuedCompletionStatus(iocp_.handle, + 0, steal_timer_dispatching, 0); } } @@ -418,7 +433,9 @@ public: return 0; boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); - std::size_t n = timer_queue.cancel_timer(token); + op_queue<operation> ops; + std::size_t n = timer_queue.cancel_timer(token, ops); + post_deferred_completions(ops); if (n > 0 && !timer_interrupt_issued_) { timer_interrupt_issued_ = true; @@ -430,6 +447,14 @@ public: } private: +#if defined(WINVER) && (WINVER < 0x0500) + typedef DWORD dword_ptr_t; + typedef ULONG ulong_ptr_t; +#else // defined(WINVER) && (WINVER < 0x0500) + typedef DWORD_PTR dword_ptr_t; + typedef ULONG_PTR ulong_ptr_t; +#endif // defined(WINVER) && (WINVER < 0x0500) + // Dequeues at most one operation from the I/O completion port, and then // executes it. Returns the number of operations that were dequeued (i.e. // either 0 or 1). @@ -454,11 +479,7 @@ private: // Get the next operation from the queue. DWORD bytes_transferred = 0; -#if defined(WINVER) && (WINVER < 0x0500) - DWORD completion_key = 0; -#else - DWORD_PTR completion_key = 0; -#endif + dword_ptr_t completion_key = 0; LPOVERLAPPED overlapped = 0; ::SetLastError(0); BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, @@ -468,32 +489,11 @@ private: // Dispatch any pending timers. if (dispatching_timers) { - try - { - boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); - if (!timer_queues_.empty()) - { - timer_queues_copy_ = timer_queues_; - for (std::size_t i = 0; i < timer_queues_copy_.size(); ++i) - { - timer_queues_copy_[i]->dispatch_timers(); - timer_queues_copy_[i]->dispatch_cancellations(); - timer_queues_copy_[i]->complete_timers(); - } - } - } - catch (...) - { - // Transfer responsibility for dispatching timers to another thread. - if (::InterlockedCompareExchange(&timer_thread_, - 0, this_thread_id) == this_thread_id) - { - ::PostQueuedCompletionStatus(iocp_.handle, - 0, transfer_timer_dispatching, 0); - } - - throw; - } + boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + op_queue<operation> ops; + ops.push(completed_ops_); + timer_queues_.get_ready_timers(ops); + post_deferred_completions(ops); } if (!ok && overlapped == 0) @@ -522,11 +522,9 @@ private: } else if (overlapped) { - // We may have been passed a last_error value in the completion_key. - if (last_error == 0) - { - last_error = completion_key; - } + operation* op = static_cast<operation*>(overlapped); + boost::system::error_code result_ec(last_error, + boost::asio::error::get_system_category()); // Transfer responsibility for dispatching timers to another thread. if (dispatching_timers && ::InterlockedCompareExchange( @@ -536,14 +534,35 @@ private: 0, transfer_timer_dispatching, 0); } - // Ensure that the io_service does not exit due to running out of work - // while we make the upcall. - auto_work work(*this); + // We may have been passed the last_error and bytes_transferred in the + // OVERLAPPED structure itself. + if (completion_key == overlapped_contains_result) + { + result_ec = boost::system::error_code(static_cast<int>(op->Offset), + *reinterpret_cast<boost::system::error_category*>(op->Internal)); + bytes_transferred = op->OffsetHigh; + } - // Dispatch the operation. - operation* op = static_cast<operation*>(overlapped); - if (op->on_completion(last_error, bytes_transferred)) + // Otherwise ensure any result has been saved into the OVERLAPPED + // structure. + else { + op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category()); + op->Offset = result_ec.value(); + op->OffsetHigh = bytes_transferred; + } + + // Dispatch the operation only if ready. The operation may not be ready + // if the initiating function (e.g. a call to WSARecv) has not yet + // returned. This is because the initiating function still wants access + // to the operation's OVERLAPPED structure. + if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) + { + // Ensure the count of outstanding work is decremented on block exit. + work_finished_on_block_exit on_exit = { this }; + (void)on_exit; + + op->complete(*this, result_ec, bytes_transferred); ec = boost::system::error_code(); return 1; } @@ -588,126 +607,23 @@ private: } } - // Check if all timer queues are empty. - bool all_timer_queues_are_empty() const - { - for (std::size_t i = 0; i < timer_queues_.size(); ++i) - if (!timer_queues_[i]->empty()) - return false; - return true; - } - // Get the timeout value for the GetQueuedCompletionStatus call. The timeout // value is returned as a number of milliseconds. We will wait no longer than // 1000 milliseconds. DWORD get_timeout() { - if (all_timer_queues_are_empty()) - return max_timeout; - - boost::posix_time::time_duration minimum_wait_duration - = boost::posix_time::milliseconds(max_timeout); - - for (std::size_t i = 0; i < timer_queues_.size(); ++i) - { - boost::posix_time::time_duration wait_duration - = timer_queues_[i]->wait_duration(); - if (wait_duration < minimum_wait_duration) - minimum_wait_duration = wait_duration; - } - - if (minimum_wait_duration > boost::posix_time::time_duration()) - { - int milliseconds = minimum_wait_duration.total_milliseconds(); - return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1); - } - else - { - return 0; - } + return timer_queues_.wait_duration_msec(max_timeout); } - struct auto_work + // Helper class to call work_finished() on block exit. + struct work_finished_on_block_exit { - auto_work(win_iocp_io_service& io_service) - : io_service_(io_service) + ~work_finished_on_block_exit() { - io_service_.work_started(); + io_service_->work_finished(); } - ~auto_work() - { - io_service_.work_finished(); - } - - private: - win_iocp_io_service& io_service_; - }; - - template <typename Handler> - struct handler_operation - : public operation - { - handler_operation(win_iocp_io_service& io_service, - Handler handler) - : operation(io_service, &handler_operation<Handler>::do_completion_impl, - &handler_operation<Handler>::destroy_impl), - io_service_(io_service), - handler_(handler) - { - io_service_.work_started(); - } - - ~handler_operation() - { - io_service_.work_finished(); - } - - private: - // Prevent copying and assignment. - handler_operation(const handler_operation&); - void operator=(const handler_operation&); - - static void do_completion_impl(operation* op, DWORD, size_t) - { - // Take ownership of the operation object. - typedef handler_operation<Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); - - // Make a copy of the handler so that the memory can be deallocated before - // the upcall is made. - Handler handler(handler_op->handler_); - - // Free the memory associated with the handler. - ptr.reset(); - - // Make the upcall. - boost_asio_handler_invoke_helpers::invoke(handler, handler); - } - - static void destroy_impl(operation* op) - { - // Take ownership of the operation object. - typedef handler_operation<Handler> op_type; - op_type* handler_op(static_cast<op_type*>(op)); - typedef handler_alloc_traits<Handler, op_type> alloc_traits; - handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op); - - // A sub-object of the handler may be the true owner of the memory - // associated with the handler. Consequently, a local copy of the handler - // is required to ensure that any owning sub-object remains valid until - // after we have deallocated the memory here. - Handler handler(handler_op->handler_); - (void)handler; - - // Free the memory associated with the handler. - ptr.reset(); - } - - win_iocp_io_service& io_service_; - Handler handler_; + win_iocp_io_service* io_service_; }; // The IO completion port used for queueing operations. @@ -721,10 +637,6 @@ private: // The count of unfinished work. long outstanding_work_; - // The count of unfinished operations. - long outstanding_operations_; - friend class operation; - // Flag to indicate whether the event loop has been stopped. long stopped_; @@ -742,7 +654,12 @@ private: // Completion key value to indicate that responsibility for dispatching // timers should be stolen from another thread. - steal_timer_dispatching = 2 + steal_timer_dispatching = 2, + + // Completion key value to indicate that an operation has posted with the + // original last_error and bytes_transferred values stored in the fields of + // the OVERLAPPED structure. + overlapped_contains_result = 3 }; // The thread that's currently in charge of dispatching timers. @@ -755,12 +672,10 @@ private: bool timer_interrupt_issued_; // The timer queues. - std::vector<timer_queue_base*> timer_queues_; + timer_queue_set timer_queues_; - // A copy of the timer queues, used when dispatching, cancelling and cleaning - // up timers. The copy is stored as a class data member to avoid unnecessary - // memory allocation. - std::vector<timer_queue_base*> timer_queues_copy_; + // The operations that are ready to dispatch. + op_queue<operation> completed_ops_; }; } // namespace detail |