summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '3rdParty/Boost/boost/asio/detail/win_iocp_io_service.hpp')
-rw-r--r--3rdParty/Boost/boost/asio/detail/win_iocp_io_service.hpp738
1 files changed, 738 insertions, 0 deletions
diff --git a/3rdParty/Boost/boost/asio/detail/win_iocp_io_service.hpp b/3rdParty/Boost/boost/asio/detail/win_iocp_io_service.hpp
new file mode 100644
index 0000000..5818542
--- /dev/null
+++ b/3rdParty/Boost/boost/asio/detail/win_iocp_io_service.hpp
@@ -0,0 +1,738 @@
+//
+// win_iocp_io_service.hpp
+// ~~~~~~~~~~~~~~~~~~~~~~~
+//
+// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
+#define BOOST_ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+# pragma once
+#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
+
+#include <boost/asio/detail/push_options.hpp>
+
+#include <boost/asio/detail/win_iocp_io_service_fwd.hpp>
+
+#if defined(BOOST_ASIO_HAS_IOCP)
+
+#include <boost/asio/detail/push_options.hpp>
+#include <limits>
+#include <boost/throw_exception.hpp>
+#include <boost/system/system_error.hpp>
+#include <boost/asio/detail/pop_options.hpp>
+
+#include <boost/asio/io_service.hpp>
+#include <boost/asio/detail/call_stack.hpp>
+#include <boost/asio/detail/handler_alloc_helpers.hpp>
+#include <boost/asio/detail/handler_invoke_helpers.hpp>
+#include <boost/asio/detail/service_base.hpp>
+#include <boost/asio/detail/socket_types.hpp>
+#include <boost/asio/detail/timer_queue.hpp>
+#include <boost/asio/detail/mutex.hpp>
+
+namespace boost {
+namespace asio {
+namespace detail {
+
+class win_iocp_io_service
+ : public boost::asio::detail::service_base<win_iocp_io_service>
+{
+public:
+ // Base class for all operations. A function pointer is used instead of
+ // virtual functions to avoid the associated overhead.
+ //
+ // This class inherits from OVERLAPPED so that we can downcast to get back to
+ // the operation pointer from the LPOVERLAPPED out parameter of
+ // GetQueuedCompletionStatus.
+ class operation
+ : public OVERLAPPED
+ {
+ public:
+ typedef void (*invoke_func_type)(operation*, DWORD, size_t);
+ typedef void (*destroy_func_type)(operation*);
+
+ operation(win_iocp_io_service& iocp_service,
+ invoke_func_type invoke_func, destroy_func_type destroy_func)
+ : outstanding_operations_(&iocp_service.outstanding_operations_),
+ invoke_func_(invoke_func),
+ destroy_func_(destroy_func)
+ {
+ Internal = 0;
+ InternalHigh = 0;
+ Offset = 0;
+ OffsetHigh = 0;
+ hEvent = 0;
+
+ ::InterlockedIncrement(outstanding_operations_);
+ }
+
+ void do_completion(DWORD last_error, size_t bytes_transferred)
+ {
+ invoke_func_(this, last_error, bytes_transferred);
+ }
+
+ void destroy()
+ {
+ destroy_func_(this);
+ }
+
+ protected:
+ // Prevent deletion through this type.
+ ~operation()
+ {
+ ::InterlockedDecrement(outstanding_operations_);
+ }
+
+ private:
+ long* outstanding_operations_;
+ invoke_func_type invoke_func_;
+ destroy_func_type destroy_func_;
+ };
+
+
+ // Constructor.
+ win_iocp_io_service(boost::asio::io_service& io_service)
+ : boost::asio::detail::service_base<win_iocp_io_service>(io_service),
+ iocp_(),
+ outstanding_work_(0),
+ outstanding_operations_(0),
+ stopped_(0),
+ shutdown_(0),
+ timer_thread_(0),
+ timer_interrupt_issued_(false)
+ {
+ }
+
+ void init(size_t concurrency_hint)
+ {
+ iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
+ static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));
+ if (!iocp_.handle)
+ {
+ DWORD last_error = ::GetLastError();
+ boost::system::system_error e(
+ boost::system::error_code(last_error,
+ boost::asio::error::get_system_category()),
+ "iocp");
+ boost::throw_exception(e);
+ }
+ }
+
+ // Destroy all user-defined handler objects owned by the service.
+ void shutdown_service()
+ {
+ ::InterlockedExchange(&shutdown_, 1);
+
+ while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0)
+ {
+ DWORD bytes_transferred = 0;
+#if (WINVER < 0x0500)
+ DWORD completion_key = 0;
+#else
+ DWORD_PTR completion_key = 0;
+#endif
+ LPOVERLAPPED overlapped = 0;
+ ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
+ &completion_key, &overlapped, INFINITE);
+ if (overlapped)
+ static_cast<operation*>(overlapped)->destroy();
+ }
+
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ timer_queues_[i]->destroy_timers();
+ timer_queues_.clear();
+ }
+
+ // Initialise the task. Nothing to do here.
+ void init_task()
+ {
+ }
+
+ // Register a handle with the IO completion port.
+ boost::system::error_code register_handle(
+ HANDLE handle, boost::system::error_code& ec)
+ {
+ if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
+ {
+ DWORD last_error = ::GetLastError();
+ ec = boost::system::error_code(last_error,
+ boost::asio::error::get_system_category());
+ }
+ else
+ {
+ ec = boost::system::error_code();
+ }
+ return ec;
+ }
+
+ // Run the event loop until stopped or no more work.
+ size_t run(boost::system::error_code& ec)
+ {
+ if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
+ {
+ ec = boost::system::error_code();
+ return 0;
+ }
+
+ call_stack<win_iocp_io_service>::context ctx(this);
+
+ size_t n = 0;
+ while (do_one(true, ec))
+ if (n != (std::numeric_limits<size_t>::max)())
+ ++n;
+ return n;
+ }
+
+ // Run until stopped or one operation is performed.
+ size_t run_one(boost::system::error_code& ec)
+ {
+ if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
+ {
+ ec = boost::system::error_code();
+ return 0;
+ }
+
+ call_stack<win_iocp_io_service>::context ctx(this);
+
+ return do_one(true, ec);
+ }
+
+ // Poll for operations without blocking.
+ size_t poll(boost::system::error_code& ec)
+ {
+ if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
+ {
+ ec = boost::system::error_code();
+ return 0;
+ }
+
+ call_stack<win_iocp_io_service>::context ctx(this);
+
+ size_t n = 0;
+ while (do_one(false, ec))
+ if (n != (std::numeric_limits<size_t>::max)())
+ ++n;
+ return n;
+ }
+
+ // Poll for one operation without blocking.
+ size_t poll_one(boost::system::error_code& ec)
+ {
+ if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
+ {
+ ec = boost::system::error_code();
+ return 0;
+ }
+
+ call_stack<win_iocp_io_service>::context ctx(this);
+
+ return do_one(false, ec);
+ }
+
+ // Stop the event processing loop.
+ void stop()
+ {
+ if (::InterlockedExchange(&stopped_, 1) == 0)
+ {
+ if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
+ {
+ DWORD last_error = ::GetLastError();
+ boost::system::system_error e(
+ boost::system::error_code(last_error,
+ boost::asio::error::get_system_category()),
+ "pqcs");
+ boost::throw_exception(e);
+ }
+ }
+ }
+
+ // Reset in preparation for a subsequent run invocation.
+ void reset()
+ {
+ ::InterlockedExchange(&stopped_, 0);
+ }
+
+ // Notify that some work has started.
+ void work_started()
+ {
+ ::InterlockedIncrement(&outstanding_work_);
+ }
+
+ // Notify that some work has finished.
+ void work_finished()
+ {
+ if (::InterlockedDecrement(&outstanding_work_) == 0)
+ stop();
+ }
+
+ // Request invocation of the given handler.
+ template <typename Handler>
+ void dispatch(Handler handler)
+ {
+ if (call_stack<win_iocp_io_service>::contains(this))
+ boost_asio_handler_invoke_helpers::invoke(handler, &handler);
+ else
+ post(handler);
+ }
+
+ // Request invocation of the given handler and return immediately.
+ template <typename Handler>
+ void post(Handler handler)
+ {
+ // If the service has been shut down we silently discard the handler.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return;
+
+ // Allocate and construct an operation to wrap the handler.
+ typedef handler_operation<Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
+
+ // Enqueue the operation on the I/O completion port.
+ if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get()))
+ {
+ DWORD last_error = ::GetLastError();
+ boost::system::system_error e(
+ boost::system::error_code(last_error,
+ boost::asio::error::get_system_category()),
+ "pqcs");
+ boost::throw_exception(e);
+ }
+
+ // Operation has been successfully posted.
+ ptr.release();
+ }
+
+ // Request invocation of the given OVERLAPPED-derived operation.
+ void post_completion(operation* op, DWORD op_last_error,
+ DWORD bytes_transferred)
+ {
+ // Enqueue the operation on the I/O completion port.
+ if (!::PostQueuedCompletionStatus(iocp_.handle,
+ bytes_transferred, op_last_error, op))
+ {
+ DWORD last_error = ::GetLastError();
+ boost::system::system_error e(
+ boost::system::error_code(last_error,
+ boost::asio::error::get_system_category()),
+ "pqcs");
+ boost::throw_exception(e);
+ }
+ }
+
+ // Add a new timer queue to the service.
+ template <typename Time_Traits>
+ void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_queues_.push_back(&timer_queue);
+ }
+
+ // Remove a timer queue from the service.
+ template <typename Time_Traits>
+ void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ if (timer_queues_[i] == &timer_queue)
+ {
+ timer_queues_.erase(timer_queues_.begin() + i);
+ return;
+ }
+ }
+ }
+
+ // Schedule a timer in the given timer queue to expire at the specified
+ // absolute time. The handler object will be invoked when the timer expires.
+ template <typename Time_Traits, typename Handler>
+ void schedule_timer(timer_queue<Time_Traits>& timer_queue,
+ const typename Time_Traits::time_type& time, Handler handler, void* token)
+ {
+ // If the service has been shut down we silently discard the timer.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return;
+
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ if (timer_queue.enqueue_timer(time, handler, token))
+ {
+ if (!timer_interrupt_issued_)
+ {
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, steal_timer_dispatching, 0);
+ }
+ }
+ }
+
+ // Cancel the timer associated with the given token. Returns the number of
+ // handlers that have been posted or dispatched.
+ template <typename Time_Traits>
+ std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
+ {
+ // If the service has been shut down we silently ignore the cancellation.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return 0;
+
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ std::size_t n = timer_queue.cancel_timer(token);
+ if (n > 0 && !timer_interrupt_issued_)
+ {
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, steal_timer_dispatching, 0);
+ }
+ return n;
+ }
+
+private:
+ // Dequeues at most one operation from the I/O completion port, and then
+ // executes it. Returns the number of operations that were dequeued (i.e.
+ // either 0 or 1).
+ size_t do_one(bool block, boost::system::error_code& ec)
+ {
+ long this_thread_id = static_cast<long>(::GetCurrentThreadId());
+
+ for (;;)
+ {
+ // Try to acquire responsibility for dispatching timers.
+ bool dispatching_timers = (::InterlockedCompareExchange(
+ &timer_thread_, this_thread_id, 0) == 0);
+
+ // Calculate timeout for GetQueuedCompletionStatus call.
+ DWORD timeout = max_timeout;
+ if (dispatching_timers)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_interrupt_issued_ = false;
+ timeout = get_timeout();
+ }
+
+ // Get the next operation from the queue.
+ DWORD bytes_transferred = 0;
+#if (WINVER < 0x0500)
+ DWORD completion_key = 0;
+#else
+ DWORD_PTR completion_key = 0;
+#endif
+ LPOVERLAPPED overlapped = 0;
+ ::SetLastError(0);
+ BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
+ &completion_key, &overlapped, block ? timeout : 0);
+ DWORD last_error = ::GetLastError();
+
+ // Dispatch any pending timers.
+ if (dispatching_timers)
+ {
+ try
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ if (!timer_queues_.empty())
+ {
+ timer_queues_copy_ = timer_queues_;
+ for (std::size_t i = 0; i < timer_queues_copy_.size(); ++i)
+ {
+ timer_queues_copy_[i]->dispatch_timers();
+ timer_queues_copy_[i]->dispatch_cancellations();
+ timer_queues_copy_[i]->complete_timers();
+ }
+ }
+ }
+ catch (...)
+ {
+ // Transfer responsibility for dispatching timers to another thread.
+ if (::InterlockedCompareExchange(&timer_thread_,
+ 0, this_thread_id) == this_thread_id)
+ {
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, transfer_timer_dispatching, 0);
+ }
+
+ throw;
+ }
+ }
+
+ if (!ok && overlapped == 0)
+ {
+ if (block && last_error == WAIT_TIMEOUT)
+ {
+ // Relinquish responsibility for dispatching timers.
+ if (dispatching_timers)
+ {
+ ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
+ }
+
+ continue;
+ }
+
+ // Transfer responsibility for dispatching timers to another thread.
+ if (dispatching_timers && ::InterlockedCompareExchange(
+ &timer_thread_, 0, this_thread_id) == this_thread_id)
+ {
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, transfer_timer_dispatching, 0);
+ }
+
+ ec = boost::system::error_code();
+ return 0;
+ }
+ else if (overlapped)
+ {
+ // We may have been passed a last_error value in the completion_key.
+ if (last_error == 0)
+ {
+ last_error = completion_key;
+ }
+
+ // Transfer responsibility for dispatching timers to another thread.
+ if (dispatching_timers && ::InterlockedCompareExchange(
+ &timer_thread_, 0, this_thread_id) == this_thread_id)
+ {
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, transfer_timer_dispatching, 0);
+ }
+
+ // Ensure that the io_service does not exit due to running out of work
+ // while we make the upcall.
+ auto_work work(*this);
+
+ // Dispatch the operation.
+ operation* op = static_cast<operation*>(overlapped);
+ op->do_completion(last_error, bytes_transferred);
+
+ ec = boost::system::error_code();
+ return 1;
+ }
+ else if (completion_key == transfer_timer_dispatching)
+ {
+ // Woken up to try to acquire responsibility for dispatching timers.
+ ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
+ }
+ else if (completion_key == steal_timer_dispatching)
+ {
+ // Woken up to steal responsibility for dispatching timers.
+ ::InterlockedExchange(&timer_thread_, 0);
+ }
+ else
+ {
+ // Relinquish responsibility for dispatching timers. If the io_service
+ // is not being stopped then the thread will get an opportunity to
+ // reacquire timer responsibility on the next loop iteration.
+ if (dispatching_timers)
+ {
+ ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
+ }
+
+ // The stopped_ flag is always checked to ensure that any leftover
+ // interrupts from a previous run invocation are ignored.
+ if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
+ {
+ // Wake up next thread that is blocked on GetQueuedCompletionStatus.
+ if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
+ {
+ last_error = ::GetLastError();
+ ec = boost::system::error_code(last_error,
+ boost::asio::error::get_system_category());
+ return 0;
+ }
+
+ ec = boost::system::error_code();
+ return 0;
+ }
+ }
+ }
+ }
+
+ // Check if all timer queues are empty.
+ bool all_timer_queues_are_empty() const
+ {
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ if (!timer_queues_[i]->empty())
+ return false;
+ return true;
+ }
+
+ // Get the timeout value for the GetQueuedCompletionStatus call. The timeout
+ // value is returned as a number of milliseconds. We will wait no longer than
+ // 1000 milliseconds.
+ DWORD get_timeout()
+ {
+ if (all_timer_queues_are_empty())
+ return max_timeout;
+
+ boost::posix_time::time_duration minimum_wait_duration
+ = boost::posix_time::milliseconds(max_timeout);
+
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ boost::posix_time::time_duration wait_duration
+ = timer_queues_[i]->wait_duration();
+ if (wait_duration < minimum_wait_duration)
+ minimum_wait_duration = wait_duration;
+ }
+
+ if (minimum_wait_duration > boost::posix_time::time_duration())
+ {
+ int milliseconds = minimum_wait_duration.total_milliseconds();
+ return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ struct auto_work
+ {
+ auto_work(win_iocp_io_service& io_service)
+ : io_service_(io_service)
+ {
+ io_service_.work_started();
+ }
+
+ ~auto_work()
+ {
+ io_service_.work_finished();
+ }
+
+ private:
+ win_iocp_io_service& io_service_;
+ };
+
+ template <typename Handler>
+ struct handler_operation
+ : public operation
+ {
+ handler_operation(win_iocp_io_service& io_service,
+ Handler handler)
+ : operation(io_service, &handler_operation<Handler>::do_completion_impl,
+ &handler_operation<Handler>::destroy_impl),
+ io_service_(io_service),
+ handler_(handler)
+ {
+ io_service_.work_started();
+ }
+
+ ~handler_operation()
+ {
+ io_service_.work_finished();
+ }
+
+ private:
+ // Prevent copying and assignment.
+ handler_operation(const handler_operation&);
+ void operator=(const handler_operation&);
+
+ static void do_completion_impl(operation* op, DWORD, size_t)
+ {
+ // Take ownership of the operation object.
+ typedef handler_operation<Handler> op_type;
+ op_type* handler_op(static_cast<op_type*>(op));
+ typedef handler_alloc_traits<Handler, op_type> alloc_traits;
+ handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
+
+ // Make a copy of the handler so that the memory can be deallocated before
+ // the upcall is made.
+ Handler handler(handler_op->handler_);
+
+ // Free the memory associated with the handler.
+ ptr.reset();
+
+ // Make the upcall.
+ boost_asio_handler_invoke_helpers::invoke(handler, &handler);
+ }
+
+ static void destroy_impl(operation* op)
+ {
+ // Take ownership of the operation object.
+ typedef handler_operation<Handler> op_type;
+ op_type* handler_op(static_cast<op_type*>(op));
+ typedef handler_alloc_traits<Handler, op_type> alloc_traits;
+ handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
+
+ // A sub-object of the handler may be the true owner of the memory
+ // associated with the handler. Consequently, a local copy of the handler
+ // is required to ensure that any owning sub-object remains valid until
+ // after we have deallocated the memory here.
+ Handler handler(handler_op->handler_);
+ (void)handler;
+
+ // Free the memory associated with the handler.
+ ptr.reset();
+ }
+
+ win_iocp_io_service& io_service_;
+ Handler handler_;
+ };
+
+ // The IO completion port used for queueing operations.
+ struct iocp_holder
+ {
+ HANDLE handle;
+ iocp_holder() : handle(0) {}
+ ~iocp_holder() { if (handle) ::CloseHandle(handle); }
+ } iocp_;
+
+ // The count of unfinished work.
+ long outstanding_work_;
+
+ // The count of unfinished operations.
+ long outstanding_operations_;
+ friend class operation;
+
+ // Flag to indicate whether the event loop has been stopped.
+ long stopped_;
+
+ // Flag to indicate whether the service has been shut down.
+ long shutdown_;
+
+ enum
+ {
+ // Maximum GetQueuedCompletionStatus timeout, in milliseconds.
+ max_timeout = 500,
+
+ // Completion key value to indicate that responsibility for dispatching
+ // timers is being cooperatively transferred from one thread to another.
+ transfer_timer_dispatching = 1,
+
+ // Completion key value to indicate that responsibility for dispatching
+ // timers should be stolen from another thread.
+ steal_timer_dispatching = 2
+ };
+
+ // The thread that's currently in charge of dispatching timers.
+ long timer_thread_;
+
+ // Mutex for protecting access to the timer queues.
+ mutex timer_mutex_;
+
+ // Whether a thread has been interrupted to process a new timeout.
+ bool timer_interrupt_issued_;
+
+ // The timer queues.
+ std::vector<timer_queue_base*> timer_queues_;
+
+ // A copy of the timer queues, used when dispatching, cancelling and cleaning
+ // up timers. The copy is stored as a class data member to avoid unnecessary
+ // memory allocation.
+ std::vector<timer_queue_base*> timer_queues_copy_;
+};
+
+} // namespace detail
+} // namespace asio
+} // namespace boost
+
+#endif // defined(BOOST_ASIO_HAS_IOCP)
+
+#include <boost/asio/detail/pop_options.hpp>
+
+#endif // BOOST_ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP