/* * Copyright (c) 2010-2016 Isode Limited. * All rights reserved. * See the COPYING file for more information. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace Swift { static const size_t BUFFER_SIZE = 4096; // ----------------------------------------------------------------------------- // A reference-counted non-modifiable buffer class. class SharedBuffer { public: SharedBuffer(const SafeByteArray& data) : data_(new std::vector >(data.begin(), data.end())), buffer_(boost::asio::buffer(*data_)) { } // ConstBufferSequence requirements. typedef boost::asio::const_buffer value_type; typedef const boost::asio::const_buffer* const_iterator; const boost::asio::const_buffer* begin() const { return &buffer_; } const boost::asio::const_buffer* end() const { return &buffer_ + 1; } private: std::shared_ptr< std::vector > > data_; boost::asio::const_buffer buffer_; }; // ----------------------------------------------------------------------------- BoostConnection::BoostConnection(std::shared_ptr ioService, EventLoop* eventLoop) : eventLoop(eventLoop), ioService(ioService), socket_(*ioService), writing_(false), closeSocketAfterNextWrite_(false) { } BoostConnection::~BoostConnection() { } void BoostConnection::listen() { doRead(); } void BoostConnection::connect(const HostAddressPort& addressPort) { boost::asio::ip::tcp::endpoint endpoint( boost::asio::ip::address::from_string(addressPort.getAddress().toString()), boost::numeric_cast(addressPort.getPort())); socket_.async_connect( endpoint, boost::bind(&BoostConnection::handleConnectFinished, shared_from_this(), boost::asio::placeholders::error)); } void BoostConnection::disconnect() { //MainEventLoop::removeEventsFromOwner(shared_from_this()); // Mac OS X apparently exhibits a problem where closing a socket during a write could potentially go into uninterruptable sleep. // See e.g. http://bugs.python.org/issue7401 // We therefore wait until any pending write finishes, which hopefully should fix our hang on exit during close(). std::lock_guard lock(writeMutex_); if (writing_) { closeSocketAfterNextWrite_ = true; } else { closeSocket(); } } void BoostConnection::closeSocket() { std::lock_guard lock(readCloseMutex_); boost::system::error_code errorCode; socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode); socket_.close(); } void BoostConnection::write(const SafeByteArray& data) { std::lock_guard lock(writeMutex_); if (!writing_) { writing_ = true; doWrite(data); } else { append(writeQueue_, data); } } void BoostConnection::doWrite(const SafeByteArray& data) { boost::asio::async_write(socket_, SharedBuffer(data), boost::bind(&BoostConnection::handleDataWritten, shared_from_this(), boost::asio::placeholders::error)); } void BoostConnection::handleConnectFinished(const boost::system::error_code& error) { SWIFT_LOG(debug) << "Connect finished: " << error << std::endl; if (!error) { eventLoop->postEvent(boost::bind(boost::ref(onConnectFinished), false), shared_from_this()); doRead(); } else if (error != boost::asio::error::operation_aborted) { eventLoop->postEvent(boost::bind(boost::ref(onConnectFinished), true), shared_from_this()); } } void BoostConnection::doRead() { readBuffer_ = std::make_shared(BUFFER_SIZE); std::lock_guard lock(readCloseMutex_); socket_.async_read_some( boost::asio::buffer(*readBuffer_), boost::bind(&BoostConnection::handleSocketRead, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void BoostConnection::handleSocketRead(const boost::system::error_code& error, size_t bytesTransferred) { SWIFT_LOG(debug) << "Socket read " << error << std::endl; if (!error) { readBuffer_->resize(bytesTransferred); eventLoop->postEvent(boost::bind(boost::ref(onDataRead), readBuffer_), shared_from_this()); doRead(); } else if (/*error == boost::asio::error::eof ||*/ error == boost::asio::error::operation_aborted) { eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), boost::optional()), shared_from_this()); } else { eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), ReadError), shared_from_this()); } } void BoostConnection::handleDataWritten(const boost::system::error_code& error) { SWIFT_LOG(debug) << "Data written " << error << std::endl; if (!error) { eventLoop->postEvent(boost::ref(onDataWritten), shared_from_this()); } else if (/*error == boost::asio::error::eof || */error == boost::asio::error::operation_aborted) { eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), boost::optional()), shared_from_this()); } else { eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), WriteError), shared_from_this()); } { std::lock_guard lock(writeMutex_); if (writeQueue_.empty()) { writing_ = false; if (closeSocketAfterNextWrite_) { closeSocket(); } } else { doWrite(writeQueue_); writeQueue_.clear(); } } } HostAddressPort BoostConnection::getLocalAddress() const { return HostAddressPort(socket_.local_endpoint()); } HostAddressPort BoostConnection::getRemoteAddress() const { return HostAddressPort(socket_.remote_endpoint()); } }