/* * Copyright (c) 2012 Yoann Blein * Licensed under the simplified BSD license. * See Documentation/Licenses/BSD-simplified.txt for more information. */ #include #include #include #include #include #include #include #include #include namespace Swift { static const size_t BUFFER_SIZE = 4096; // ----------------------------------------------------------------------------- // A reference-counted non-modifiable buffer class. class SharedBuffer { public: SharedBuffer(const SafeByteArray& data) : data_(new std::vector >(data.begin(), data.end())), buffer_(boost::asio::buffer(*data_)) { } // ConstBufferSequence requirements. typedef boost::asio::const_buffer value_type; typedef const boost::asio::const_buffer* const_iterator; const boost::asio::const_buffer* begin() const { return &buffer_; } const boost::asio::const_buffer* end() const { return &buffer_ + 1; } private: boost::shared_ptr< std::vector > > data_; boost::asio::const_buffer buffer_; }; // ----------------------------------------------------------------------------- BoostUDPSocket::BoostUDPSocket(boost::shared_ptr ioService, EventLoop* eventLoop) : eventLoop(eventLoop), ioService(ioService), socket_(*ioService), sending_(false) { } BoostUDPSocket::~BoostUDPSocket() { } void BoostUDPSocket::bind(int port) { if (!socket_.is_open()) socket_.open(boost::asio::ip::udp::v4()); socket_.bind(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)); } void BoostUDPSocket::listen() { if (!socket_.is_open()) socket_.open(boost::asio::ip::udp::v4()); doRead(); } void BoostUDPSocket::connect(const HostAddressPort &address) { if (!socket_.is_open()) socket_.open(boost::asio::ip::udp::v4()); socket_.connect(address.toEndpoint()); // TODO: handle errors eventLoop->postEvent(boost::bind(boost::ref(onConnected)), shared_from_this()); } void BoostUDPSocket::connectToFirstIncoming() { boost::shared_ptr buf = boost::make_shared(1); socket_.async_receive_from( boost::asio::buffer(*buf), remoteEndpoint_, boost::bind(&BoostUDPSocket::handleFirstRead, shared_from_this(), boost::asio::placeholders::error)); } void BoostUDPSocket::close() { boost::system::error_code errorCode; socket_.shutdown(boost::asio::ip::udp::socket::shutdown_send, errorCode); socket_.close(); } void BoostUDPSocket::send(const SafeByteArray& data) { boost::lock_guard lock(sendMutex_); if (!sending_) { sending_ = true; doSend(data); } else { append(sendQueue_, data); } } void BoostUDPSocket::doSend(const SafeByteArray& data) { socket_.async_send(SharedBuffer(data), boost::bind(&BoostUDPSocket::handleDataWritten, shared_from_this(), boost::asio::placeholders::error)); } void BoostUDPSocket::doRead() { readBuffer_ = boost::make_shared(BUFFER_SIZE); socket_.async_receive( boost::asio::buffer(*readBuffer_), boost::bind(&BoostUDPSocket::handleSocketRead, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void BoostUDPSocket::handleSocketRead(const boost::system::error_code& error, size_t bytesTransferred) { SWIFT_LOG(debug) << "Socket read " << error << std::endl; if (!error) { readBuffer_->resize(bytesTransferred); eventLoop->postEvent(boost::bind(boost::ref(onDataRead), readBuffer_), shared_from_this()); doRead(); } // else if (/*error == boost::asio::error::eof ||*/ error == boost::asio::error::operation_aborted) { // eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), boost::optional()), shared_from_this()); // } // else { // eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), ReadError), shared_from_this()); // } } void BoostUDPSocket::handleFirstRead(const boost::system::error_code &error) { if (!error) { socket_.connect(remoteEndpoint_); // TODO: handle errors eventLoop->postEvent(boost::bind(boost::ref(onConnected)), shared_from_this()); } } void BoostUDPSocket::handleDataWritten(const boost::system::error_code& error) { SWIFT_LOG(debug) << "Data written " << error << std::endl; // if (!error) { // eventLoop->postEvent(boost::ref(onDataWritten), shared_from_this()); // } // else if (/*error == boost::asio::error::eof || */error == boost::asio::error::operation_aborted) { // eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), boost::optional()), shared_from_this()); // } // else { // eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), WriteError), shared_from_this()); // } boost::lock_guard lock(sendMutex_); if (sendQueue_.empty()) { sending_ = false; } else { doSend(sendQueue_); sendQueue_.clear(); } } }