// // reactor_op_queue.hpp // ~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_ASIO_DETAIL_REACTOR_OP_QUEUE_HPP #define BOOST_ASIO_DETAIL_REACTOR_OP_QUEUE_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include #include #include #include namespace boost { namespace asio { namespace detail { template class reactor_op_queue : private noncopyable { public: // Constructor. reactor_op_queue() : operations_() { } // Add a new operation to the queue. Returns true if this is the only // operation for the given descriptor, in which case the reactor's event // demultiplexing function call may need to be interrupted and restarted. bool enqueue_operation(Descriptor descriptor, reactor_op* op) { typedef typename operations_map::iterator iterator; typedef typename operations_map::value_type value_type; std::pair entry = operations_.insert(value_type(descriptor, operations())); entry.first->second.op_queue_.push(op); return entry.second; } // Cancel all operations associated with the descriptor. Any operations // pending for the descriptor will be notified that they have been cancelled // next time perform_cancellations is called. Returns true if any operations // were cancelled, in which case the reactor's event demultiplexing function // may need to be interrupted and restarted. bool cancel_operations(Descriptor descriptor, op_queue& ops, const boost::system::error_code& ec = boost::asio::error::operation_aborted) { typename operations_map::iterator i = operations_.find(descriptor); if (i != operations_.end()) { while (reactor_op* op = i->second.op_queue_.front()) { op->ec_ = ec; i->second.op_queue_.pop(); ops.push(op); } operations_.erase(i); return true; } return false; } // Whether there are no operations in the queue. bool empty() const { return operations_.empty(); } // Determine whether there are any operations associated with the descriptor. bool has_operation(Descriptor descriptor) const { return operations_.find(descriptor) != operations_.end(); } // Perform the operations corresponding to the descriptor. Returns true if // there are still unfinished operations queued for the descriptor. bool perform_operations(Descriptor descriptor, op_queue& ops) { typename operations_map::iterator i = operations_.find(descriptor); if (i != operations_.end()) { while (reactor_op* op = i->second.op_queue_.front()) { if (op->perform()) { i->second.op_queue_.pop(); ops.push(op); } else { return true; } } operations_.erase(i); } return false; } // Fill a descriptor set with the descriptors corresponding to each active // operation. The op_queue is used only when descriptors fail to be added to // the descriptor set. template void get_descriptors(Descriptor_Set& descriptors, op_queue& ops) { typename operations_map::iterator i = operations_.begin(); while (i != operations_.end()) { Descriptor descriptor = i->first; ++i; if (!descriptors.set(descriptor)) { boost::system::error_code ec(error::fd_set_failure); cancel_operations(descriptor, ops, ec); } } } // Perform the operations corresponding to the ready file descriptors // contained in the given descriptor set. template void perform_operations_for_descriptors( const Descriptor_Set& descriptors, op_queue& ops) { typename operations_map::iterator i = operations_.begin(); while (i != operations_.end()) { typename operations_map::iterator op_iter = i++; if (descriptors.is_set(op_iter->first)) { while (reactor_op* op = op_iter->second.op_queue_.front()) { if (op->perform()) { op_iter->second.op_queue_.pop(); ops.push(op); } else { break; } } if (op_iter->second.op_queue_.empty()) operations_.erase(op_iter); } } } // Get all operations owned by the queue. void get_all_operations(op_queue& ops) { typename operations_map::iterator i = operations_.begin(); while (i != operations_.end()) { typename operations_map::iterator op_iter = i++; ops.push(op_iter->second.op_queue_); operations_.erase(op_iter); } } private: struct operations { operations() {} operations(const operations&) {} void operator=(const operations&) {} // The operations waiting on the desccriptor. op_queue op_queue_; }; // The type for a map of operations. typedef hash_map operations_map; // The operations that are currently executing asynchronously. operations_map operations_; }; } // namespace detail } // namespace asio } // namespace boost #include #endif // BOOST_ASIO_DETAIL_REACTOR_OP_QUEUE_HPP