summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp')
-rw-r--r--3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp545
1 files changed, 230 insertions, 315 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp b/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp
index 1dfb9a1..83b16bf 100644
--- a/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp
+++ b/3rdParty/Boost/src/boost/asio/detail/win_iocp_io_service.hpp
@@ -29,122 +29,37 @@
#include <boost/asio/io_service.hpp>
#include <boost/asio/detail/call_stack.hpp>
+#include <boost/asio/detail/completion_handler.hpp>
+#include <boost/asio/detail/fenced_block.hpp>
#include <boost/asio/detail/handler_alloc_helpers.hpp>
#include <boost/asio/detail/handler_invoke_helpers.hpp>
+#include <boost/asio/detail/mutex.hpp>
+#include <boost/asio/detail/op_queue.hpp>
#include <boost/asio/detail/service_base.hpp>
#include <boost/asio/detail/socket_types.hpp>
-#include <boost/asio/detail/timer_queue.hpp>
-#include <boost/asio/detail/mutex.hpp>
+#include <boost/asio/detail/timer_op.hpp>
+#include <boost/asio/detail/timer_queue_base.hpp>
+#include <boost/asio/detail/timer_queue_fwd.hpp>
+#include <boost/asio/detail/timer_queue_set.hpp>
+#include <boost/asio/detail/win_iocp_operation.hpp>
namespace boost {
namespace asio {
namespace detail {
+class timer_op;
+
class win_iocp_io_service
: public boost::asio::detail::service_base<win_iocp_io_service>
{
public:
- // Base class for all operations. A function pointer is used instead of
- // virtual functions to avoid the associated overhead.
- //
- // This class inherits from OVERLAPPED so that we can downcast to get back to
- // the operation pointer from the LPOVERLAPPED out parameter of
- // GetQueuedCompletionStatus.
- class operation;
- friend class operation;
- class operation
- : public OVERLAPPED
- {
- public:
- typedef void (*invoke_func_type)(operation*, DWORD, size_t);
- typedef void (*destroy_func_type)(operation*);
-
- operation(win_iocp_io_service& iocp_service,
- invoke_func_type invoke_func, destroy_func_type destroy_func)
- : iocp_service_(iocp_service),
- ready_(0),
- last_error_(~DWORD(0)),
- bytes_transferred_(0),
- invoke_func_(invoke_func),
- destroy_func_(destroy_func)
- {
- Internal = 0;
- InternalHigh = 0;
- Offset = 0;
- OffsetHigh = 0;
- hEvent = 0;
-
- ::InterlockedIncrement(&iocp_service_.outstanding_operations_);
- }
-
- void reset()
- {
- Internal = 0;
- InternalHigh = 0;
- Offset = 0;
- OffsetHigh = 0;
- hEvent = 0;
- ready_ = 0;
- last_error_ = ~DWORD(0);
- bytes_transferred_ = 0;
- }
-
- void on_pending()
- {
- if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
- iocp_service_.post_completion(this, last_error_, bytes_transferred_);
- }
-
- void on_immediate_completion(DWORD last_error, DWORD bytes_transferred)
- {
- ready_ = 1;
- iocp_service_.post_completion(this, last_error, bytes_transferred);
- }
-
- bool on_completion(DWORD last_error, DWORD bytes_transferred)
- {
- if (last_error_ == ~DWORD(0))
- {
- last_error_ = last_error;
- bytes_transferred_ = bytes_transferred;
- }
-
- if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
- {
- invoke_func_(this, last_error_, bytes_transferred_);
- return true;
- }
-
- return false;
- }
-
- void destroy()
- {
- destroy_func_(this);
- }
-
- protected:
- // Prevent deletion through this type.
- ~operation()
- {
- ::InterlockedDecrement(&iocp_service_.outstanding_operations_);
- }
-
- private:
- win_iocp_io_service& iocp_service_;
- long ready_;
- DWORD last_error_;
- DWORD bytes_transferred_;
- invoke_func_type invoke_func_;
- destroy_func_type destroy_func_;
- };
+ typedef win_iocp_operation operation;
// Constructor.
win_iocp_io_service(boost::asio::io_service& io_service)
: boost::asio::detail::service_base<win_iocp_io_service>(io_service),
iocp_(),
outstanding_work_(0),
- outstanding_operations_(0),
stopped_(0),
shutdown_(0),
timer_thread_(0),
@@ -172,24 +87,34 @@ public:
{
::InterlockedExchange(&shutdown_, 1);
- while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0)
+ while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
{
- DWORD bytes_transferred = 0;
-#if defined(WINVER) && (WINVER < 0x0500)
- DWORD completion_key = 0;
-#else
- DWORD_PTR completion_key = 0;
-#endif
- LPOVERLAPPED overlapped = 0;
- ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
- &completion_key, &overlapped, INFINITE);
- if (overlapped)
- static_cast<operation*>(overlapped)->destroy();
+ op_queue<operation> ops;
+ timer_queues_.get_all_timers(ops);
+ ops.push(completed_ops_);
+ if (!ops.empty())
+ {
+ while (operation* op = ops.front())
+ {
+ ops.pop();
+ ::InterlockedDecrement(&outstanding_work_);
+ op->destroy();
+ }
+ }
+ else
+ {
+ DWORD bytes_transferred = 0;
+ dword_ptr_t completion_key = 0;
+ LPOVERLAPPED overlapped = 0;
+ ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
+ &completion_key, &overlapped, max_timeout);
+ if (overlapped)
+ {
+ ::InterlockedDecrement(&outstanding_work_);
+ static_cast<operation*>(overlapped)->destroy();
+ }
+ }
}
-
- for (std::size_t i = 0; i < timer_queues_.size(); ++i)
- timer_queues_[i]->destroy_timers();
- timer_queues_.clear();
}
// Initialise the task. Nothing to do here.
@@ -219,6 +144,7 @@ public:
{
if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
{
+ stop();
ec = boost::system::error_code();
return 0;
}
@@ -237,6 +163,7 @@ public:
{
if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
{
+ stop();
ec = boost::system::error_code();
return 0;
}
@@ -251,6 +178,7 @@ public:
{
if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
{
+ stop();
ec = boost::system::error_code();
return 0;
}
@@ -269,6 +197,7 @@ public:
{
if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
{
+ stop();
ec = boost::system::error_code();
return 0;
}
@@ -319,7 +248,10 @@ public:
void dispatch(Handler handler)
{
if (call_stack<win_iocp_io_service>::contains(this))
+ {
+ boost::asio::detail::fenced_block b;
boost_asio_handler_invoke_helpers::invoke(handler, handler);
+ }
else
post(handler);
}
@@ -328,37 +260,128 @@ public:
template <typename Handler>
void post(Handler handler)
{
- // If the service has been shut down we silently discard the handler.
- if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
- return;
-
// Allocate and construct an operation to wrap the handler.
- typedef handler_operation<Handler> value_type;
+ typedef completion_handler<Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler);
- handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, handler);
+
+ post_immediate_completion(ptr.get());
+ ptr.release();
+ }
+
+ // Request invocation of the given operation and return immediately. Assumes
+ // that work_started() has not yet been called for the operation.
+ void post_immediate_completion(operation* op)
+ {
+ work_started();
+ post_deferred_completion(op);
+ }
+
+ // Request invocation of the given operation and return immediately. Assumes
+ // that work_started() was previously called for the operation.
+ void post_deferred_completion(operation* op)
+ {
+ // Flag the operation as ready.
+ op->ready_ = 1;
// Enqueue the operation on the I/O completion port.
- ptr.get()->on_immediate_completion(0, 0);
+ if (!::PostQueuedCompletionStatus(iocp_.handle,
+ 0, overlapped_contains_result, op))
+ {
+ // Out of resources. Put on completed queue instead.
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ completed_ops_.push(op);
+ }
+ }
- // Operation has been successfully posted.
- ptr.release();
+ // Request invocation of the given operation and return immediately. Assumes
+ // that work_started() was previously called for the operations.
+ void post_deferred_completions(op_queue<operation>& ops)
+ {
+ while (operation* op = ops.front())
+ {
+ ops.pop();
+
+ // Flag the operation as ready.
+ op->ready_ = 1;
+
+ // Enqueue the operation on the I/O completion port.
+ if (!::PostQueuedCompletionStatus(iocp_.handle,
+ 0, overlapped_contains_result, op))
+ {
+ // Out of resources. Put on completed queue instead.
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ completed_ops_.push(op);
+ completed_ops_.push(ops);
+ }
+ }
}
- // Request invocation of the given OVERLAPPED-derived operation.
- void post_completion(operation* op, DWORD op_last_error,
- DWORD bytes_transferred)
+ // Called after starting an overlapped I/O operation that did not complete
+ // immediately. The caller must have already called work_started() prior to
+ // starting the operation.
+ void on_pending(operation* op)
{
+ if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
+ {
+ // Enqueue the operation on the I/O completion port.
+ if (!::PostQueuedCompletionStatus(iocp_.handle,
+ 0, overlapped_contains_result, op))
+ {
+ // Out of resources. Put on completed queue instead.
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ completed_ops_.push(op);
+ }
+ }
+ }
+
+ // Called after starting an overlapped I/O operation that completed
+ // immediately. The caller must have already called work_started() prior to
+ // starting the operation.
+ void on_completion(operation* op,
+ DWORD last_error = 0, DWORD bytes_transferred = 0)
+ {
+ // Flag that the operation is ready for invocation.
+ op->ready_ = 1;
+
+ // Store results in the OVERLAPPED structure.
+ op->Internal = reinterpret_cast<ulong_ptr_t>(
+ &boost::asio::error::get_system_category());
+ op->Offset = last_error;
+ op->OffsetHigh = bytes_transferred;
+
// Enqueue the operation on the I/O completion port.
if (!::PostQueuedCompletionStatus(iocp_.handle,
- bytes_transferred, op_last_error, op))
+ 0, overlapped_contains_result, op))
{
- DWORD last_error = ::GetLastError();
- boost::system::system_error e(
- boost::system::error_code(last_error,
- boost::asio::error::get_system_category()),
- "pqcs");
- boost::throw_exception(e);
+ // Out of resources. Put on completed queue instead.
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ completed_ops_.push(op);
+ }
+ }
+
+ // Called after starting an overlapped I/O operation that completed
+ // immediately. The caller must have already called work_started() prior to
+ // starting the operation.
+ void on_completion(operation* op,
+ const boost::system::error_code& ec, DWORD bytes_transferred = 0)
+ {
+ // Flag that the operation is ready for invocation.
+ op->ready_ = 1;
+
+ // Store results in the OVERLAPPED structure.
+ op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
+ op->Offset = ec.value();
+ op->OffsetHigh = bytes_transferred;
+
+ // Enqueue the operation on the I/O completion port.
+ if (!::PostQueuedCompletionStatus(iocp_.handle,
+ 0, overlapped_contains_result, op))
+ {
+ // Out of resources. Put on completed queue instead.
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ completed_ops_.push(op);
}
}
@@ -367,7 +390,7 @@ public:
void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
{
boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
- timer_queues_.push_back(&timer_queue);
+ timer_queues_.insert(&timer_queue);
}
// Remove a timer queue from the service.
@@ -375,36 +398,28 @@ public:
void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
{
boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
- for (std::size_t i = 0; i < timer_queues_.size(); ++i)
- {
- if (timer_queues_[i] == &timer_queue)
- {
- timer_queues_.erase(timer_queues_.begin() + i);
- return;
- }
- }
+ timer_queues_.erase(&timer_queue);
}
- // Schedule a timer in the given timer queue to expire at the specified
- // absolute time. The handler object will be invoked when the timer expires.
- template <typename Time_Traits, typename Handler>
+ // Schedule a new operation in the given timer queue to expire at the
+ // specified absolute time.
+ template <typename Time_Traits>
void schedule_timer(timer_queue<Time_Traits>& timer_queue,
- const typename Time_Traits::time_type& time, Handler handler, void* token)
+ const typename Time_Traits::time_type& time, timer_op* op, void* token)
{
// If the service has been shut down we silently discard the timer.
if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
return;
boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
- if (timer_queue.enqueue_timer(time, handler, token))
+ bool interrupt = timer_queue.enqueue_timer(time, op, token);
+ work_started();
+ if (interrupt && !timer_interrupt_issued_)
{
- if (!timer_interrupt_issued_)
- {
- timer_interrupt_issued_ = true;
- lock.unlock();
- ::PostQueuedCompletionStatus(iocp_.handle,
- 0, steal_timer_dispatching, 0);
- }
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, steal_timer_dispatching, 0);
}
}
@@ -418,7 +433,9 @@ public:
return 0;
boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
- std::size_t n = timer_queue.cancel_timer(token);
+ op_queue<operation> ops;
+ std::size_t n = timer_queue.cancel_timer(token, ops);
+ post_deferred_completions(ops);
if (n > 0 && !timer_interrupt_issued_)
{
timer_interrupt_issued_ = true;
@@ -430,6 +447,14 @@ public:
}
private:
+#if defined(WINVER) && (WINVER < 0x0500)
+ typedef DWORD dword_ptr_t;
+ typedef ULONG ulong_ptr_t;
+#else // defined(WINVER) && (WINVER < 0x0500)
+ typedef DWORD_PTR dword_ptr_t;
+ typedef ULONG_PTR ulong_ptr_t;
+#endif // defined(WINVER) && (WINVER < 0x0500)
+
// Dequeues at most one operation from the I/O completion port, and then
// executes it. Returns the number of operations that were dequeued (i.e.
// either 0 or 1).
@@ -454,11 +479,7 @@ private:
// Get the next operation from the queue.
DWORD bytes_transferred = 0;
-#if defined(WINVER) && (WINVER < 0x0500)
- DWORD completion_key = 0;
-#else
- DWORD_PTR completion_key = 0;
-#endif
+ dword_ptr_t completion_key = 0;
LPOVERLAPPED overlapped = 0;
::SetLastError(0);
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
@@ -468,32 +489,11 @@ private:
// Dispatch any pending timers.
if (dispatching_timers)
{
- try
- {
- boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
- if (!timer_queues_.empty())
- {
- timer_queues_copy_ = timer_queues_;
- for (std::size_t i = 0; i < timer_queues_copy_.size(); ++i)
- {
- timer_queues_copy_[i]->dispatch_timers();
- timer_queues_copy_[i]->dispatch_cancellations();
- timer_queues_copy_[i]->complete_timers();
- }
- }
- }
- catch (...)
- {
- // Transfer responsibility for dispatching timers to another thread.
- if (::InterlockedCompareExchange(&timer_thread_,
- 0, this_thread_id) == this_thread_id)
- {
- ::PostQueuedCompletionStatus(iocp_.handle,
- 0, transfer_timer_dispatching, 0);
- }
-
- throw;
- }
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ op_queue<operation> ops;
+ ops.push(completed_ops_);
+ timer_queues_.get_ready_timers(ops);
+ post_deferred_completions(ops);
}
if (!ok && overlapped == 0)
@@ -522,11 +522,9 @@ private:
}
else if (overlapped)
{
- // We may have been passed a last_error value in the completion_key.
- if (last_error == 0)
- {
- last_error = completion_key;
- }
+ operation* op = static_cast<operation*>(overlapped);
+ boost::system::error_code result_ec(last_error,
+ boost::asio::error::get_system_category());
// Transfer responsibility for dispatching timers to another thread.
if (dispatching_timers && ::InterlockedCompareExchange(
@@ -536,14 +534,35 @@ private:
0, transfer_timer_dispatching, 0);
}
- // Ensure that the io_service does not exit due to running out of work
- // while we make the upcall.
- auto_work work(*this);
+ // We may have been passed the last_error and bytes_transferred in the
+ // OVERLAPPED structure itself.
+ if (completion_key == overlapped_contains_result)
+ {
+ result_ec = boost::system::error_code(static_cast<int>(op->Offset),
+ *reinterpret_cast<boost::system::error_category*>(op->Internal));
+ bytes_transferred = op->OffsetHigh;
+ }
- // Dispatch the operation.
- operation* op = static_cast<operation*>(overlapped);
- if (op->on_completion(last_error, bytes_transferred))
+ // Otherwise ensure any result has been saved into the OVERLAPPED
+ // structure.
+ else
{
+ op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
+ op->Offset = result_ec.value();
+ op->OffsetHigh = bytes_transferred;
+ }
+
+ // Dispatch the operation only if ready. The operation may not be ready
+ // if the initiating function (e.g. a call to WSARecv) has not yet
+ // returned. This is because the initiating function still wants access
+ // to the operation's OVERLAPPED structure.
+ if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
+ {
+ // Ensure the count of outstanding work is decremented on block exit.
+ work_finished_on_block_exit on_exit = { this };
+ (void)on_exit;
+
+ op->complete(*this, result_ec, bytes_transferred);
ec = boost::system::error_code();
return 1;
}
@@ -588,126 +607,23 @@ private:
}
}
- // Check if all timer queues are empty.
- bool all_timer_queues_are_empty() const
- {
- for (std::size_t i = 0; i < timer_queues_.size(); ++i)
- if (!timer_queues_[i]->empty())
- return false;
- return true;
- }
-
// Get the timeout value for the GetQueuedCompletionStatus call. The timeout
// value is returned as a number of milliseconds. We will wait no longer than
// 1000 milliseconds.
DWORD get_timeout()
{
- if (all_timer_queues_are_empty())
- return max_timeout;
-
- boost::posix_time::time_duration minimum_wait_duration
- = boost::posix_time::milliseconds(max_timeout);
-
- for (std::size_t i = 0; i < timer_queues_.size(); ++i)
- {
- boost::posix_time::time_duration wait_duration
- = timer_queues_[i]->wait_duration();
- if (wait_duration < minimum_wait_duration)
- minimum_wait_duration = wait_duration;
- }
-
- if (minimum_wait_duration > boost::posix_time::time_duration())
- {
- int milliseconds = minimum_wait_duration.total_milliseconds();
- return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
- }
- else
- {
- return 0;
- }
+ return timer_queues_.wait_duration_msec(max_timeout);
}
- struct auto_work
+ // Helper class to call work_finished() on block exit.
+ struct work_finished_on_block_exit
{
- auto_work(win_iocp_io_service& io_service)
- : io_service_(io_service)
+ ~work_finished_on_block_exit()
{
- io_service_.work_started();
+ io_service_->work_finished();
}
- ~auto_work()
- {
- io_service_.work_finished();
- }
-
- private:
- win_iocp_io_service& io_service_;
- };
-
- template <typename Handler>
- struct handler_operation
- : public operation
- {
- handler_operation(win_iocp_io_service& io_service,
- Handler handler)
- : operation(io_service, &handler_operation<Handler>::do_completion_impl,
- &handler_operation<Handler>::destroy_impl),
- io_service_(io_service),
- handler_(handler)
- {
- io_service_.work_started();
- }
-
- ~handler_operation()
- {
- io_service_.work_finished();
- }
-
- private:
- // Prevent copying and assignment.
- handler_operation(const handler_operation&);
- void operator=(const handler_operation&);
-
- static void do_completion_impl(operation* op, DWORD, size_t)
- {
- // Take ownership of the operation object.
- typedef handler_operation<Handler> op_type;
- op_type* handler_op(static_cast<op_type*>(op));
- typedef handler_alloc_traits<Handler, op_type> alloc_traits;
- handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
-
- // Make a copy of the handler so that the memory can be deallocated before
- // the upcall is made.
- Handler handler(handler_op->handler_);
-
- // Free the memory associated with the handler.
- ptr.reset();
-
- // Make the upcall.
- boost_asio_handler_invoke_helpers::invoke(handler, handler);
- }
-
- static void destroy_impl(operation* op)
- {
- // Take ownership of the operation object.
- typedef handler_operation<Handler> op_type;
- op_type* handler_op(static_cast<op_type*>(op));
- typedef handler_alloc_traits<Handler, op_type> alloc_traits;
- handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
-
- // A sub-object of the handler may be the true owner of the memory
- // associated with the handler. Consequently, a local copy of the handler
- // is required to ensure that any owning sub-object remains valid until
- // after we have deallocated the memory here.
- Handler handler(handler_op->handler_);
- (void)handler;
-
- // Free the memory associated with the handler.
- ptr.reset();
- }
-
- win_iocp_io_service& io_service_;
- Handler handler_;
+ win_iocp_io_service* io_service_;
};
// The IO completion port used for queueing operations.
@@ -721,10 +637,6 @@ private:
// The count of unfinished work.
long outstanding_work_;
- // The count of unfinished operations.
- long outstanding_operations_;
- friend class operation;
-
// Flag to indicate whether the event loop has been stopped.
long stopped_;
@@ -742,7 +654,12 @@ private:
// Completion key value to indicate that responsibility for dispatching
// timers should be stolen from another thread.
- steal_timer_dispatching = 2
+ steal_timer_dispatching = 2,
+
+ // Completion key value to indicate that an operation has posted with the
+ // original last_error and bytes_transferred values stored in the fields of
+ // the OVERLAPPED structure.
+ overlapped_contains_result = 3
};
// The thread that's currently in charge of dispatching timers.
@@ -755,12 +672,10 @@ private:
bool timer_interrupt_issued_;
// The timer queues.
- std::vector<timer_queue_base*> timer_queues_;
+ timer_queue_set timer_queues_;
- // A copy of the timer queues, used when dispatching, cancelling and cleaning
- // up timers. The copy is stored as a class data member to avoid unnecessary
- // memory allocation.
- std::vector<timer_queue_base*> timer_queues_copy_;
+ // The operations that are ready to dispatch.
+ op_queue<operation> completed_ops_;
};
} // namespace detail