summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'Swiften')
-rw-r--r--Swiften/Network/BoostConnection.cpp23
-rw-r--r--Swiften/Network/BoostConnection.h6
-rw-r--r--Swiften/QA/NetworkTest/BoostConnectionTest.cpp43
3 files changed, 70 insertions, 2 deletions
diff --git a/Swiften/Network/BoostConnection.cpp b/Swiften/Network/BoostConnection.cpp
index f3cf9a6..7751535 100644
--- a/Swiften/Network/BoostConnection.cpp
+++ b/Swiften/Network/BoostConnection.cpp
@@ -43,7 +43,7 @@ class SharedBuffer {
// -----------------------------------------------------------------------------
BoostConnection::BoostConnection(boost::asio::io_service* ioService, EventLoop* eventLoop) :
- eventLoop(eventLoop), socket_(*ioService), readBuffer_(BUFFER_SIZE) {
+ eventLoop(eventLoop), socket_(*ioService), readBuffer_(BUFFER_SIZE), writing_(false) {
}
BoostConnection::~BoostConnection() {
@@ -67,6 +67,17 @@ void BoostConnection::disconnect() {
}
void BoostConnection::write(const ByteArray& data) {
+ boost::lock_guard<boost::mutex> lock(writeMutex_);
+ if (!writing_) {
+ writing_ = true;
+ doWrite(data);
+ }
+ else {
+ writeQueue_ += data;
+ }
+}
+
+void BoostConnection::doWrite(const ByteArray& data) {
boost::asio::async_write(socket_, SharedBuffer(data),
boost::bind(&BoostConnection::handleDataWritten, shared_from_this(), boost::asio::placeholders::error));
}
@@ -110,6 +121,16 @@ void BoostConnection::handleDataWritten(const boost::system::error_code& error)
else {
eventLoop->postEvent(boost::bind(boost::ref(onDisconnected), WriteError), shared_from_this());
}
+ {
+ boost::lock_guard<boost::mutex> lock(writeMutex_);
+ if (writeQueue_.isEmpty()) {
+ writing_ = false;
+ }
+ else {
+ doWrite(writeQueue_);
+ writeQueue_.clear();
+ }
+ }
}
HostAddressPort BoostConnection::getLocalAddress() const {
diff --git a/Swiften/Network/BoostConnection.h b/Swiften/Network/BoostConnection.h
index da4f7b8..7b15966 100644
--- a/Swiften/Network/BoostConnection.h
+++ b/Swiften/Network/BoostConnection.h
@@ -8,6 +8,7 @@
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread/mutex.hpp>
#include "Swiften/Network/Connection.h"
#include "Swiften/EventLoop/EventOwner.h"
@@ -50,11 +51,14 @@ namespace Swift {
void handleSocketRead(const boost::system::error_code& error, size_t bytesTransferred);
void handleDataWritten(const boost::system::error_code& error);
void doRead();
+ void doWrite(const ByteArray& data);
private:
EventLoop* eventLoop;
boost::asio::ip::tcp::socket socket_;
std::vector<char> readBuffer_;
- bool disconnecting_;
+ boost::mutex writeMutex_;
+ bool writing_;
+ ByteArray writeQueue_;
};
}
diff --git a/Swiften/QA/NetworkTest/BoostConnectionTest.cpp b/Swiften/QA/NetworkTest/BoostConnectionTest.cpp
index 530a126..7259807 100644
--- a/Swiften/QA/NetworkTest/BoostConnectionTest.cpp
+++ b/Swiften/QA/NetworkTest/BoostConnectionTest.cpp
@@ -25,6 +25,7 @@ class BoostConnectionTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testDestructor);
CPPUNIT_TEST(testDestructor_PendingEvents);
CPPUNIT_TEST(testWrite);
+ CPPUNIT_TEST(testWriteMultipleSimultaniouslyQueuesWrites);
#ifdef TEST_IPV6
CPPUNIT_TEST(testWrite_IPv6);
#endif
@@ -37,6 +38,7 @@ class BoostConnectionTest : public CppUnit::TestFixture {
boostIOServiceThread_ = new BoostIOServiceThread();
eventLoop_ = new DummyEventLoop();
disconnected = false;
+ connectFinished = false;
}
void tearDown() {
@@ -88,6 +90,41 @@ class BoostConnectionTest : public CppUnit::TestFixture {
testling->disconnect();
}
+
+ void testWriteMultipleSimultaniouslyQueuesWrites() {
+ BoostConnection::ref testling(BoostConnection::create(&boostIOService, eventLoop_));
+ testling->onConnectFinished.connect(boost::bind(&BoostConnectionTest::handleConnectFinished, this));
+ testling->onDataRead.connect(boost::bind(&BoostConnectionTest::handleDataRead, this, _1));
+ testling->onDisconnected.connect(boost::bind(&BoostConnectionTest::handleDisconnected, this));
+ testling->connect(HostAddressPort(HostAddress("65.99.222.137"), 5222));
+ while (!connectFinished) {
+ boostIOService.run_one();
+ eventLoop_->processEvents();
+ }
+
+ testling->write(ByteArray("<stream:strea"));
+ testling->write(ByteArray("m"));
+ testling->write(ByteArray(">"));
+
+ // Check that we only did one write event, the others are queued
+ /*int runHandlers = */boostIOService.poll();
+ // Disabling this test, because poll runns all handlers that are added during poll() as well, so
+ // this test doesn't really work any more. We'll have to trust that things are queued.
+ //CPPUNIT_ASSERT_EQUAL(1, runHandlers);
+ // Process the other events
+ while (receivedData.isEmpty()) {
+ boostIOService.run_one();
+ eventLoop_->processEvents();
+ }
+
+ // Disconnect & clean up
+ testling->disconnect();
+ while (!disconnected) {
+ boostIOService.run_one();
+ eventLoop_->processEvents();
+ }
+ }
+
void doWrite(BoostConnection* connection) {
connection->write(ByteArray("<stream:stream>"));
connection->write(ByteArray("\r\n\r\n")); // Temporarily, while we don't have an xmpp server running on ipv6
@@ -101,11 +138,17 @@ class BoostConnectionTest : public CppUnit::TestFixture {
disconnected = true;
}
+ void handleConnectFinished() {
+ connectFinished = true;
+ }
+
private:
BoostIOServiceThread* boostIOServiceThread_;
+ boost::asio::io_service boostIOService;
DummyEventLoop* eventLoop_;
ByteArray receivedData;
bool disconnected;
+ bool connectFinished;
};
CPPUNIT_TEST_SUITE_REGISTRATION(BoostConnectionTest);