diff options
Diffstat (limited to '3rdParty/Boost/src/boost/asio/detail/impl/task_io_service.ipp')
-rw-r--r-- | 3rdParty/Boost/src/boost/asio/detail/impl/task_io_service.ipp | 322 |
1 files changed, 255 insertions, 67 deletions
diff --git a/3rdParty/Boost/src/boost/asio/detail/impl/task_io_service.ipp b/3rdParty/Boost/src/boost/asio/detail/impl/task_io_service.ipp index babfa7b..674df63 100644 --- a/3rdParty/Boost/src/boost/asio/detail/impl/task_io_service.ipp +++ b/3rdParty/Boost/src/boost/asio/detail/impl/task_io_service.ipp @@ -2,7 +2,7 @@ // detail/impl/task_io_service.ipp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // -// Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -20,7 +20,6 @@ #if !defined(BOOST_ASIO_HAS_IOCP) #include <boost/limits.hpp> -#include <boost/asio/detail/call_stack.hpp> #include <boost/asio/detail/event.hpp> #include <boost/asio/detail/reactor.hpp> #include <boost/asio/detail/task_io_service.hpp> @@ -31,41 +30,73 @@ namespace boost { namespace asio { namespace detail { +struct task_io_service::thread_info +{ + event* wakeup_event; + op_queue<operation> private_op_queue; + long private_outstanding_work; + thread_info* next; +}; + struct task_io_service::task_cleanup { ~task_cleanup() { + if (this_thread_->private_outstanding_work > 0) + { + boost::asio::detail::increment( + task_io_service_->outstanding_work_, + this_thread_->private_outstanding_work); + } + this_thread_->private_outstanding_work = 0; + // Enqueue the completed operations and reinsert the task at the end of // the operation queue. lock_->lock(); task_io_service_->task_interrupted_ = true; - task_io_service_->op_queue_.push(*ops_); + task_io_service_->op_queue_.push(this_thread_->private_op_queue); task_io_service_->op_queue_.push(&task_io_service_->task_operation_); } task_io_service* task_io_service_; mutex::scoped_lock* lock_; - op_queue<operation>* ops_; + thread_info* this_thread_; }; -struct task_io_service::work_finished_on_block_exit +struct task_io_service::work_cleanup { - ~work_finished_on_block_exit() + ~work_cleanup() { - task_io_service_->work_finished(); + if (this_thread_->private_outstanding_work > 1) + { + boost::asio::detail::increment( + task_io_service_->outstanding_work_, + this_thread_->private_outstanding_work - 1); + } + else if (this_thread_->private_outstanding_work < 1) + { + task_io_service_->work_finished(); + } + this_thread_->private_outstanding_work = 0; + +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (!this_thread_->private_op_queue.empty()) + { + lock_->lock(); + task_io_service_->op_queue_.push(this_thread_->private_op_queue); + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) } task_io_service* task_io_service_; + mutex::scoped_lock* lock_; + thread_info* this_thread_; }; -struct task_io_service::idle_thread_info -{ - event wakeup_event; - idle_thread_info* next; -}; - -task_io_service::task_io_service(boost::asio::io_service& io_service) +task_io_service::task_io_service( + boost::asio::io_service& io_service, std::size_t concurrency_hint) : boost::asio::detail::service_base<task_io_service>(io_service), + one_thread_(concurrency_hint == 1), mutex_(), task_(0), task_interrupted_(true), @@ -74,10 +105,7 @@ task_io_service::task_io_service(boost::asio::io_service& io_service) shutdown_(false), first_idle_thread_(0) { -} - -void task_io_service::init(std::size_t /*concurrency_hint*/) -{ + BOOST_ASIO_HANDLER_TRACKING_INIT; } void task_io_service::shutdown_service() @@ -119,15 +147,17 @@ std::size_t task_io_service::run(boost::system::error_code& ec) return 0; } - call_stack<task_io_service>::context ctx(this); - - idle_thread_info this_idle_thread; - this_idle_thread.next = 0; + thread_info this_thread; + event wakeup_event; + this_thread.wakeup_event = &wakeup_event; + this_thread.private_outstanding_work = 0; + this_thread.next = 0; + thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); std::size_t n = 0; - for (; do_one(lock, &this_idle_thread); lock.lock()) + for (; do_run_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits<std::size_t>::max)()) ++n; return n; @@ -142,31 +172,46 @@ std::size_t task_io_service::run_one(boost::system::error_code& ec) return 0; } - call_stack<task_io_service>::context ctx(this); - - idle_thread_info this_idle_thread; - this_idle_thread.next = 0; + thread_info this_thread; + event wakeup_event; + this_thread.wakeup_event = &wakeup_event; + this_thread.private_outstanding_work = 0; + this_thread.next = 0; + thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); - return do_one(lock, &this_idle_thread); + return do_run_one(lock, this_thread, ec); } std::size_t task_io_service::poll(boost::system::error_code& ec) { + ec = boost::system::error_code(); if (outstanding_work_ == 0) { stop(); - ec = boost::system::error_code(); return 0; } - call_stack<task_io_service>::context ctx(this); + thread_info this_thread; + this_thread.wakeup_event = 0; + this_thread.private_outstanding_work = 0; + this_thread.next = 0; + thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + // We want to support nested calls to poll() and poll_one(), so any handlers + // that are already on a thread-private queue need to be put on to the main + // queue now. + if (one_thread_) + if (thread_info* outer_thread_info = ctx.next_by_key()) + op_queue_.push(outer_thread_info->private_op_queue); +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + std::size_t n = 0; - for (; do_one(lock, 0); lock.lock()) + for (; do_poll_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits<std::size_t>::max)()) ++n; return n; @@ -181,11 +226,24 @@ std::size_t task_io_service::poll_one(boost::system::error_code& ec) return 0; } - call_stack<task_io_service>::context ctx(this); + thread_info this_thread; + this_thread.wakeup_event = 0; + this_thread.private_outstanding_work = 0; + this_thread.next = 0; + thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); - return do_one(lock, 0); +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + // We want to support nested calls to poll() and poll_one(), so any handlers + // that are already on a thread-private queue need to be put on to the main + // queue now. + if (one_thread_) + if (thread_info* outer_thread_info = ctx.next_by_key()) + op_queue_.push(outer_thread_info->private_op_queue); +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + + return do_poll_one(lock, this_thread, ec); } void task_io_service::stop() @@ -194,6 +252,12 @@ void task_io_service::stop() stop_all_threads(lock); } +bool task_io_service::stopped() const +{ + mutex::scoped_lock lock(mutex_); + return stopped_; +} + void task_io_service::reset() { mutex::scoped_lock lock(mutex_); @@ -202,12 +266,37 @@ void task_io_service::reset() void task_io_service::post_immediate_completion(task_io_service::operation* op) { +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (one_thread_) + { + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + ++this_thread->private_outstanding_work; + this_thread->private_op_queue.push(op); + return; + } + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + work_started(); - post_deferred_completion(op); + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); } void task_io_service::post_deferred_completion(task_io_service::operation* op) { +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (one_thread_) + { + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + this_thread->private_op_queue.push(op); + return; + } + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + mutex::scoped_lock lock(mutex_); op_queue_.push(op); wake_one_thread_and_unlock(lock); @@ -218,17 +307,72 @@ void task_io_service::post_deferred_completions( { if (!ops.empty()) { +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (one_thread_) + { + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + this_thread->private_op_queue.push(ops); + return; + } + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + mutex::scoped_lock lock(mutex_); op_queue_.push(ops); wake_one_thread_and_unlock(lock); } } -std::size_t task_io_service::do_one(mutex::scoped_lock& lock, - task_io_service::idle_thread_info* this_idle_thread) +void task_io_service::post_private_immediate_completion( + task_io_service::operation* op) +{ + work_started(); + post_private_deferred_completion(op); +} + +void task_io_service::post_private_deferred_completion( + task_io_service::operation* op) +{ +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + this_thread->private_op_queue.push(op); + return; + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); +} + +void task_io_service::post_non_private_immediate_completion( + task_io_service::operation* op) +{ + work_started(); + post_non_private_deferred_completion(op); +} + +void task_io_service::post_non_private_deferred_completion( + task_io_service::operation* op) +{ + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); +} + +void task_io_service::abandon_operations( + op_queue<task_io_service::operation>& ops) +{ + op_queue<task_io_service::operation> ops2; + ops2.push(ops); +} + +std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, + task_io_service::thread_info& this_thread, + const boost::system::error_code& ec) { - bool polling = !this_idle_thread; - bool task_has_run = false; while (!stopped_) { if (!op_queue_.empty()) @@ -240,61 +384,105 @@ std::size_t task_io_service::do_one(mutex::scoped_lock& lock, if (o == &task_operation_) { - task_interrupted_ = more_handlers || polling; + task_interrupted_ = more_handlers; - // If the task has already run and we're polling then we're done. - if (task_has_run && polling) + if (more_handlers && !one_thread_) { - task_interrupted_ = true; - op_queue_.push(&task_operation_); - return 0; + if (!wake_one_idle_thread_and_unlock(lock)) + lock.unlock(); } - task_has_run = true; - - if (!more_handlers || !wake_one_idle_thread_and_unlock(lock)) + else lock.unlock(); - op_queue<operation> completed_ops; - task_cleanup c = { this, &lock, &completed_ops }; - (void)c; + task_cleanup on_exit = { this, &lock, &this_thread }; + (void)on_exit; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. - task_->run(!more_handlers && !polling, completed_ops); + task_->run(!more_handlers, this_thread.private_op_queue); } else { - if (more_handlers) + std::size_t task_result = o->task_result_; + + if (more_handlers && !one_thread_) wake_one_thread_and_unlock(lock); else lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. - work_finished_on_block_exit on_exit = { this }; + work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; - // Complete the operation. May throw an exception. - o->complete(*this); // deletes the operation object + // Complete the operation. May throw an exception. Deletes the object. + o->complete(*this, ec, task_result); return 1; } } - else if (this_idle_thread) + else { // Nothing to run right now, so just wait for work to do. - this_idle_thread->next = first_idle_thread_; - first_idle_thread_ = this_idle_thread; - this_idle_thread->wakeup_event.clear(lock); - this_idle_thread->wakeup_event.wait(lock); + this_thread.next = first_idle_thread_; + first_idle_thread_ = &this_thread; + this_thread.wakeup_event->clear(lock); + this_thread.wakeup_event->wait(lock); } - else + } + + return 0; +} + +std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, + task_io_service::thread_info& this_thread, + const boost::system::error_code& ec) +{ + if (stopped_) + return 0; + + operation* o = op_queue_.front(); + if (o == &task_operation_) + { + op_queue_.pop(); + lock.unlock(); + { - return 0; + task_cleanup c = { this, &lock, &this_thread }; + (void)c; + + // Run the task. May throw an exception. Only block if the operation + // queue is empty and we're not polling, otherwise we want to return + // as soon as possible. + task_->run(false, this_thread.private_op_queue); } + + o = op_queue_.front(); + if (o == &task_operation_) + return 0; } - return 0; + if (o == 0) + return 0; + + op_queue_.pop(); + bool more_handlers = (!op_queue_.empty()); + + std::size_t task_result = o->task_result_; + + if (more_handlers && !one_thread_) + wake_one_thread_and_unlock(lock); + else + lock.unlock(); + + // Ensure the count of outstanding work is decremented on block exit. + work_cleanup on_exit = { this, &lock, &this_thread }; + (void)on_exit; + + // Complete the operation. May throw an exception. Deletes the object. + o->complete(*this, ec, task_result); + + return 1; } void task_io_service::stop_all_threads( @@ -304,10 +492,10 @@ void task_io_service::stop_all_threads( while (first_idle_thread_) { - idle_thread_info* idle_thread = first_idle_thread_; + thread_info* idle_thread = first_idle_thread_; first_idle_thread_ = idle_thread->next; idle_thread->next = 0; - idle_thread->wakeup_event.signal(lock); + idle_thread->wakeup_event->signal(lock); } if (!task_interrupted_ && task_) @@ -322,10 +510,10 @@ bool task_io_service::wake_one_idle_thread_and_unlock( { if (first_idle_thread_) { - idle_thread_info* idle_thread = first_idle_thread_; + thread_info* idle_thread = first_idle_thread_; first_idle_thread_ = idle_thread->next; idle_thread->next = 0; - idle_thread->wakeup_event.signal_and_unlock(lock); + idle_thread->wakeup_event->signal_and_unlock(lock); return true; } return false; |