summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp26
-rw-r--r--Swiften/FileTransfer/SOCKS5BytestreamServerSession.h2
-rw-r--r--Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp51
3 files changed, 76 insertions, 3 deletions
diff --git a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp
index 2260fc20..4412d0b 100644
--- a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp
+++ b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp
@@ -19,7 +19,7 @@
namespace Swift {
-SOCKS5BytestreamServerSession::SOCKS5BytestreamServerSession(boost::shared_ptr<Connection> connection, SOCKS5BytestreamRegistry* bytestreams) : connection(connection), bytestreams(bytestreams), state(Initial), chunkSize(131072) {
+SOCKS5BytestreamServerSession::SOCKS5BytestreamServerSession(boost::shared_ptr<Connection> connection, SOCKS5BytestreamRegistry* bytestreams) : connection(connection), bytestreams(bytestreams), state(Initial), chunkSize(131072), waitingForData(false) {
connection->onDisconnected.connect(boost::bind(&SOCKS5BytestreamServerSession::handleDisconnected, this, _1));
}
@@ -40,6 +40,9 @@ void SOCKS5BytestreamServerSession::stop() {
connection->onDataWritten.disconnect(boost::bind(&SOCKS5BytestreamServerSession::sendData, this));
connection->onDataRead.disconnect(boost::bind(&SOCKS5BytestreamServerSession::handleDataRead, this, _1));
connection->disconnect();
+ if (readBytestream) {
+ readBytestream->onDataAvailable.disconnect(boost::bind(&SOCKS5BytestreamServerSession::handleDataAvailable, this));
+ }
state = Finished;
}
@@ -75,6 +78,12 @@ void SOCKS5BytestreamServerSession::handleDataRead(boost::shared_ptr<SafeByteArr
}
}
+void SOCKS5BytestreamServerSession::handleDataAvailable() {
+ if (waitingForData) {
+ sendData();
+ }
+}
+
void SOCKS5BytestreamServerSession::handleDisconnected(const boost::optional<Connection::Error>& error) {
SWIFT_LOG(debug) << (error ? (error == Connection::ReadError ? "Read Error" : "Write Error") : "No Error") << std::endl;
if (error) {
@@ -136,6 +145,11 @@ void SOCKS5BytestreamServerSession::process() {
connection->write(result);
bytestreams->serverSessions[streamID] = this;
state = ReadyForTransfer;
+
+ if (readBytestream) {
+ readBytestream->onDataAvailable.connect(boost::bind(&SOCKS5BytestreamServerSession::handleDataAvailable, this));
+ }
+
}
}
}
@@ -146,8 +160,14 @@ void SOCKS5BytestreamServerSession::sendData() {
if (!readBytestream->isFinished()) {
try {
SafeByteArray dataToSend = createSafeByteArray(*readBytestream->read(chunkSize));
- connection->write(dataToSend);
- onBytesSent(dataToSend.size());
+ if (!dataToSend.empty()) {
+ connection->write(dataToSend);
+ onBytesSent(dataToSend.size());
+ waitingForData = false;
+ }
+ else {
+ waitingForData = true;
+ }
}
catch (const BytestreamException&) {
finish(true);
diff --git a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h
index 4557a36..ed77df8 100644
--- a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h
+++ b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h
@@ -54,6 +54,7 @@ namespace Swift {
void process();
void handleDataRead(boost::shared_ptr<SafeByteArray>);
void handleDisconnected(const boost::optional<Connection::Error>&);
+ void handleDataAvailable();
void sendData();
private:
@@ -64,5 +65,6 @@ namespace Swift {
int chunkSize;
boost::shared_ptr<ReadBytestream> readBytestream;
boost::shared_ptr<WriteBytestream> writeBytestream;
+ bool waitingForData;
};
}
diff --git a/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp b/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp
index e6df862..6dec37f 100644
--- a/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp
+++ b/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp
@@ -28,6 +28,8 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testRequest_UnknownBytestream);
CPPUNIT_TEST(testReceiveData);
CPPUNIT_TEST(testReceiveData_Chunked);
+ CPPUNIT_TEST(testDataStreamPauseStopsSendingData);
+ CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData);
CPPUNIT_TEST_SUITE_END();
public:
@@ -38,6 +40,7 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
connection = boost::make_shared<DummyConnection>(eventLoop);
connection->onDataSent.connect(boost::bind(&SOCKS5BytestreamServerSessionTest::handleDataWritten, this, _1));
stream1 = boost::make_shared<ByteArrayReadBytestream>(createByteArray("abcdefg"));
+ finished = false;
}
void tearDown() {
@@ -117,6 +120,46 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
CPPUNIT_ASSERT_EQUAL(4, receivedDataChunks);
}
+ void testDataStreamPauseStopsSendingData() {
+ boost::shared_ptr<SOCKS5BytestreamServerSession> testling(createSession());
+ testling->setChunkSize(3);
+ stream1->setDataComplete(false);
+ StartStopper<SOCKS5BytestreamServerSession> stopper(testling.get());
+ bytestreams->addReadBytestream("abcdef", stream1);
+ authenticate();
+ request("abcdef");
+ eventLoop->processEvents();
+ testling->startTransfer();
+ eventLoop->processEvents();
+ skipHeader("abcdef");
+ CPPUNIT_ASSERT(createByteArray("abcdefg") == receivedData);
+ CPPUNIT_ASSERT_EQUAL(4, receivedDataChunks);
+
+ CPPUNIT_ASSERT(!finished);
+ CPPUNIT_ASSERT(!error);
+ }
+
+ void testDataStreamResumeAfterPauseSendsData() {
+ boost::shared_ptr<SOCKS5BytestreamServerSession> testling(createSession());
+ testling->setChunkSize(3);
+ stream1->setDataComplete(false);
+ StartStopper<SOCKS5BytestreamServerSession> stopper(testling.get());
+ bytestreams->addReadBytestream("abcdef", stream1);
+ authenticate();
+ request("abcdef");
+ eventLoop->processEvents();
+ testling->startTransfer();
+ eventLoop->processEvents();
+ skipHeader("abcdef");
+
+ stream1->addData(createByteArray("xyz"));
+ eventLoop->processEvents();
+
+ CPPUNIT_ASSERT(createByteArray("abcdefgxyz") == receivedData);
+ CPPUNIT_ASSERT(!finished);
+ CPPUNIT_ASSERT(!error);
+ }
+
private:
void receive(const SafeByteArray& data) {
connection->receive(data);
@@ -147,9 +190,15 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
private:
SOCKS5BytestreamServerSession* createSession() {
SOCKS5BytestreamServerSession* session = new SOCKS5BytestreamServerSession(connection, bytestreams);
+ session->onFinished.connect(boost::bind(&SOCKS5BytestreamServerSessionTest::handleFinished, this, _1));
return session;
}
+ void handleFinished(boost::optional<FileTransferError> error) {
+ finished = true;
+ this->error = error;
+ }
+
private:
DummyEventLoop* eventLoop;
SOCKS5BytestreamRegistry* bytestreams;
@@ -157,6 +206,8 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
std::vector<unsigned char> receivedData;
int receivedDataChunks;
boost::shared_ptr<ByteArrayReadBytestream> stream1;
+ bool finished;
+ boost::optional<FileTransferError> error;
};
CPPUNIT_TEST_SUITE_REGISTRATION(SOCKS5BytestreamServerSessionTest);