diff options
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/task_io_service.hpp')
-rw-r--r-- | 3rdParty/Boost/src/boost/asio/detail/task_io_service.hpp | 287 |
1 files changed, 158 insertions, 129 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/task_io_service.hpp b/3rdParty/Boost/src/boost/asio/detail/task_io_service.hpp index ddfea72..f6de370 100644 --- a/3rdParty/Boost/src/boost/asio/detail/task_io_service.hpp +++ b/3rdParty/Boost/src/boost/asio/detail/task_io_service.hpp @@ -15,21 +15,25 @@ # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) -#if defined(BOOST_ASIO_ENABLE_TWO_LOCK_QUEUE) -#include <boost/asio/detail/task_io_service_2lock.hpp> -#else // defined(BOOST_ASIO_ENABLE_TWO_LOCK_QUEUE) - #include <boost/asio/detail/push_options.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/detail/call_stack.hpp> +#include <boost/asio/detail/completion_handler.hpp> #include <boost/asio/detail/event.hpp> +#include <boost/asio/detail/fenced_block.hpp> #include <boost/asio/detail/handler_alloc_helpers.hpp> #include <boost/asio/detail/handler_invoke_helpers.hpp> -#include <boost/asio/detail/handler_queue.hpp> #include <boost/asio/detail/mutex.hpp> +#include <boost/asio/detail/op_queue.hpp> #include <boost/asio/detail/service_base.hpp> #include <boost/asio/detail/task_io_service_fwd.hpp> +#include <boost/asio/detail/task_io_service_operation.hpp> + +#include <boost/asio/detail/push_options.hpp> +#include <boost/detail/atomic_count.hpp> +#include <boost/system/error_code.hpp> +#include <boost/asio/detail/pop_options.hpp> namespace boost { namespace asio { @@ -40,6 +44,8 @@ class task_io_service : public boost::asio::detail::service_base<task_io_service<Task> > { public: + typedef task_io_service_operation<Task> operation; + // Constructor. task_io_service(boost::asio::io_service& io_service) : boost::asio::detail::service_base<task_io_service<Task> >(io_service), @@ -65,12 +71,12 @@ public: lock.unlock(); // Destroy handler objects. - while (!handler_queue_.empty()) + while (!op_queue_.empty()) { - handler_queue::handler* h = handler_queue_.front(); - handler_queue_.pop(); - if (h != &task_handler_) - h->destroy(); + operation* o = op_queue_.front(); + op_queue_.pop(); + if (o != &task_operation_) + o->destroy(); } // Reset to initial state. @@ -84,14 +90,21 @@ public: if (!shutdown_ && !task_) { task_ = &use_service<Task>(this->get_io_service()); - handler_queue_.push(&task_handler_); - interrupt_one_idle_thread(lock); + op_queue_.push(&task_operation_); + wake_one_thread_and_unlock(lock); } } // Run the event loop until interrupted or no more work. size_t run(boost::system::error_code& ec) { + ec = boost::system::error_code(); + if (outstanding_work_ == 0) + { + stop(); + return 0; + } + typename call_stack<task_io_service>::context ctx(this); idle_thread_info this_idle_thread; @@ -100,7 +113,7 @@ public: boost::asio::detail::mutex::scoped_lock lock(mutex_); size_t n = 0; - while (do_one(lock, &this_idle_thread, ec)) + for (; do_one(lock, &this_idle_thread); lock.lock()) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; @@ -109,6 +122,13 @@ public: // Run until interrupted or one operation is performed. size_t run_one(boost::system::error_code& ec) { + ec = boost::system::error_code(); + if (outstanding_work_ == 0) + { + stop(); + return 0; + } + typename call_stack<task_io_service>::context ctx(this); idle_thread_info this_idle_thread; @@ -116,18 +136,25 @@ public: boost::asio::detail::mutex::scoped_lock lock(mutex_); - return do_one(lock, &this_idle_thread, ec); + return do_one(lock, &this_idle_thread); } // Poll for operations without blocking. size_t poll(boost::system::error_code& ec) { + if (outstanding_work_ == 0) + { + stop(); + ec = boost::system::error_code(); + return 0; + } + typename call_stack<task_io_service>::context ctx(this); boost::asio::detail::mutex::scoped_lock lock(mutex_); size_t n = 0; - while (do_one(lock, 0, ec)) + for (; do_one(lock, 0); lock.lock()) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; @@ -136,11 +163,18 @@ public: // Poll for one operation without blocking. size_t poll_one(boost::system::error_code& ec) { + ec = boost::system::error_code(); + if (outstanding_work_ == 0) + { + stop(); + return 0; + } + typename call_stack<task_io_service>::context ctx(this); boost::asio::detail::mutex::scoped_lock lock(mutex_); - return do_one(lock, 0, ec); + return do_one(lock, 0); } // Interrupt the event processing loop. @@ -160,16 +194,14 @@ public: // Notify that some work has started. void work_started() { - boost::asio::detail::mutex::scoped_lock lock(mutex_); ++outstanding_work_; } // Notify that some work has finished. void work_finished() { - boost::asio::detail::mutex::scoped_lock lock(mutex_); if (--outstanding_work_ == 0) - stop_all_threads(lock); + stop(); } // Request invocation of the given handler. @@ -177,7 +209,10 @@ public: void dispatch(Handler handler) { if (call_stack<task_io_service>::contains(this)) + { + boost::asio::detail::fenced_block b; boost_asio_handler_invoke_helpers::invoke(handler, handler); + } else post(handler); } @@ -187,29 +222,41 @@ public: void post(Handler handler) { // Allocate and construct an operation to wrap the handler. - handler_queue::scoped_ptr ptr(handler_queue::wrap(handler)); - - boost::asio::detail::mutex::scoped_lock lock(mutex_); - - // If the service has been shut down we silently discard the handler. - if (shutdown_) - return; + typedef completion_handler<Handler> value_type; + typedef handler_alloc_traits<Handler, value_type> alloc_traits; + raw_handler_ptr<alloc_traits> raw_ptr(handler); + handler_ptr<alloc_traits> ptr(raw_ptr, handler); - // Add the handler to the end of the queue. - handler_queue_.push(ptr.get()); + post_immediate_completion(ptr.get()); ptr.release(); + } - // An undelivered handler is treated as unfinished work. - ++outstanding_work_; + // Request invocation of the given operation and return immediately. Assumes + // that work_started() has not yet been called for the operation. + void post_immediate_completion(operation* op) + { + work_started(); + post_deferred_completion(op); + } + + // Request invocation of the given operation and return immediately. Assumes + // that work_started() was previously called for the operation. + void post_deferred_completion(operation* op) + { + boost::asio::detail::mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); + } - // Wake up a thread to execute the handler. - if (!interrupt_one_idle_thread(lock)) + // Request invocation of the given operations and return immediately. Assumes + // that work_started() was previously called for each operation. + void post_deferred_completions(op_queue<operation>& ops) + { + if (!ops.empty()) { - if (!task_interrupted_ && task_) - { - task_interrupted_ = true; - task_->interrupt(); - } + boost::asio::detail::mutex::scoped_lock lock(mutex_); + op_queue_.push(ops); + wake_one_thread_and_unlock(lock); } } @@ -217,57 +264,58 @@ private: struct idle_thread_info; size_t do_one(boost::asio::detail::mutex::scoped_lock& lock, - idle_thread_info* this_idle_thread, boost::system::error_code& ec) + idle_thread_info* this_idle_thread) { - if (outstanding_work_ == 0 && !stopped_) - { - stop_all_threads(lock); - ec = boost::system::error_code(); - return 0; - } - bool polling = !this_idle_thread; bool task_has_run = false; while (!stopped_) { - if (!handler_queue_.empty()) + if (!op_queue_.empty()) { // Prepare to execute first handler from queue. - handler_queue::handler* h = handler_queue_.front(); - handler_queue_.pop(); + operation* o = op_queue_.front(); + op_queue_.pop(); + bool more_handlers = (!op_queue_.empty()); - if (h == &task_handler_) + if (o == &task_operation_) { - bool more_handlers = (!handler_queue_.empty()); task_interrupted_ = more_handlers || polling; // If the task has already run and we're polling then we're done. if (task_has_run && polling) { task_interrupted_ = true; - handler_queue_.push(&task_handler_); - ec = boost::system::error_code(); + op_queue_.push(&task_operation_); return 0; } task_has_run = true; - lock.unlock(); - task_cleanup c(lock, *this); + if (!more_handlers || !wake_one_idle_thread_and_unlock(lock)) + lock.unlock(); - // Run the task. May throw an exception. Only block if the handler - // queue is empty and we have an idle_thread_info object, otherwise - // we want to return as soon as possible. - task_->run(!more_handlers && !polling); + op_queue<operation> completed_ops; + task_cleanup c = { this, &lock, &completed_ops }; + (void)c; + + // Run the task. May throw an exception. Only block if the operation + // queue is empty and we're not polling, otherwise we want to return + // as soon as possible. + task_->run(!more_handlers && !polling, completed_ops); } else { - lock.unlock(); - handler_cleanup c(lock, *this); + if (more_handlers) + wake_one_thread_and_unlock(lock); + else + lock.unlock(); + + // Ensure the count of outstanding work is decremented on block exit. + work_finished_on_block_exit on_exit = { this }; + (void)on_exit; - // Invoke the handler. May throw an exception. - h->invoke(); // invoke() deletes the handler object + // Complete the operation. May throw an exception. + o->complete(*this); // deletes the operation object - ec = boost::system::error_code(); return 1; } } @@ -281,12 +329,10 @@ private: } else { - ec = boost::system::error_code(); return 0; } } - ec = boost::system::error_code(); return 0; } @@ -295,7 +341,15 @@ private: boost::asio::detail::mutex::scoped_lock& lock) { stopped_ = true; - interrupt_all_idle_threads(lock); + + while (first_idle_thread_) + { + idle_thread_info* idle_thread = first_idle_thread_; + first_idle_thread_ = idle_thread->next; + idle_thread->next = 0; + idle_thread->wakeup_event.signal(lock); + } + if (!task_interrupted_ && task_) { task_interrupted_ = true; @@ -303,9 +357,10 @@ private: } } - // Interrupt a single idle thread. Returns true if a thread was interrupted, - // false if no running thread could be found to interrupt. - bool interrupt_one_idle_thread( + // Wakes a single idle thread and unlocks the mutex. Returns true if an idle + // thread was found. If there is no idle thread, returns false and leaves the + // mutex locked. + bool wake_one_idle_thread_and_unlock( boost::asio::detail::mutex::scoped_lock& lock) { if (first_idle_thread_) @@ -313,74 +368,56 @@ private: idle_thread_info* idle_thread = first_idle_thread_; first_idle_thread_ = idle_thread->next; idle_thread->next = 0; - idle_thread->wakeup_event.signal(lock); + idle_thread->wakeup_event.signal_and_unlock(lock); return true; } return false; } - // Interrupt all idle threads. - void interrupt_all_idle_threads( + // Wake a single idle thread, or the task, and always unlock the mutex. + void wake_one_thread_and_unlock( boost::asio::detail::mutex::scoped_lock& lock) { - while (first_idle_thread_) + if (!wake_one_idle_thread_and_unlock(lock)) { - idle_thread_info* idle_thread = first_idle_thread_; - first_idle_thread_ = idle_thread->next; - idle_thread->next = 0; - idle_thread->wakeup_event.signal(lock); + if (!task_interrupted_ && task_) + { + task_interrupted_ = true; + task_->interrupt(); + } + lock.unlock(); } } // Helper class to perform task-related operations on block exit. - class task_cleanup; - friend class task_cleanup; - class task_cleanup + struct task_cleanup; + friend struct task_cleanup; + struct task_cleanup { - public: - task_cleanup(boost::asio::detail::mutex::scoped_lock& lock, - task_io_service& task_io_svc) - : lock_(lock), - task_io_service_(task_io_svc) - { - } - ~task_cleanup() { - // Reinsert the task at the end of the handler queue. - lock_.lock(); - task_io_service_.task_interrupted_ = true; - task_io_service_.handler_queue_.push(&task_io_service_.task_handler_); + // Enqueue the completed operations and reinsert the task at the end of + // the operation queue. + lock_->lock(); + task_io_service_->task_interrupted_ = true; + task_io_service_->op_queue_.push(*ops_); + task_io_service_->op_queue_.push(&task_io_service_->task_operation_); } - private: - boost::asio::detail::mutex::scoped_lock& lock_; - task_io_service& task_io_service_; + task_io_service* task_io_service_; + boost::asio::detail::mutex::scoped_lock* lock_; + op_queue<operation>* ops_; }; - // Helper class to perform handler-related operations on block exit. - class handler_cleanup; - friend class handler_cleanup; - class handler_cleanup + // Helper class to call work_finished() on block exit. + struct work_finished_on_block_exit { - public: - handler_cleanup(boost::asio::detail::mutex::scoped_lock& lock, - task_io_service& task_io_svc) - : lock_(lock), - task_io_service_(task_io_svc) - { - } - - ~handler_cleanup() + ~work_finished_on_block_exit() { - lock_.lock(); - if (--task_io_service_.outstanding_work_ == 0) - task_io_service_.stop_all_threads(lock_); + task_io_service_->work_finished(); } - private: - boost::asio::detail::mutex::scoped_lock& lock_; - task_io_service& task_io_service_; + task_io_service* task_io_service_; }; // Mutex to protect access to internal data. @@ -389,25 +426,20 @@ private: // The task to be run by this service. Task* task_; - // Handler object to represent the position of the task in the queue. - class task_handler - : public handler_queue::handler + // Operation object to represent the position of the task in the queue. + struct task_operation : public operation { - public: - task_handler() - : handler_queue::handler(0, 0) - { - } - } task_handler_; + task_operation() : operation(0) {} + } task_operation_; // Whether the task has been interrupted. bool task_interrupted_; // The count of unfinished work. - int outstanding_work_; + boost::detail::atomic_count outstanding_work_; // The queue of handlers that are ready to be delivered. - handler_queue handler_queue_; + op_queue<operation> op_queue_; // Flag to indicate that the dispatcher has been stopped. bool stopped_; @@ -422,7 +454,7 @@ private: idle_thread_info* next; }; - // The number of threads that are currently idle. + // The threads that are currently idle. idle_thread_info* first_idle_thread_; }; @@ -430,9 +462,6 @@ private: } // namespace asio } // namespace boost -#include <boost/system/error_code.hpp> #include <boost/asio/detail/pop_options.hpp> -#endif // defined(BOOST_ASIO_ENABLE_TWO_LOCK_QUEUE) - #endif // BOOST_ASIO_DETAIL_TASK_IO_SERVICE_HPP |