From ca8ee84e53e7d4910a7c17cf05e70b0b3377c9c3 Mon Sep 17 00:00:00 2001 From: HanzZ <hanzz.k@gmail.com> Date: Wed, 12 Sep 2012 07:36:27 +0200 Subject: Add support for pause/resume using Bytestream for SOCKS5BytestreamServerSession. Copyright (c) 2012 Jan Kaluza Licensed under the Simplified BSD license. See Documentation/Licenses/BSD-simplified.txt for more information. diff --git a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp index 2260fc20..4412d0b 100644 --- a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp +++ b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp @@ -19,7 +19,7 @@ namespace Swift { -SOCKS5BytestreamServerSession::SOCKS5BytestreamServerSession(boost::shared_ptr<Connection> connection, SOCKS5BytestreamRegistry* bytestreams) : connection(connection), bytestreams(bytestreams), state(Initial), chunkSize(131072) { +SOCKS5BytestreamServerSession::SOCKS5BytestreamServerSession(boost::shared_ptr<Connection> connection, SOCKS5BytestreamRegistry* bytestreams) : connection(connection), bytestreams(bytestreams), state(Initial), chunkSize(131072), waitingForData(false) { connection->onDisconnected.connect(boost::bind(&SOCKS5BytestreamServerSession::handleDisconnected, this, _1)); } @@ -40,6 +40,9 @@ void SOCKS5BytestreamServerSession::stop() { connection->onDataWritten.disconnect(boost::bind(&SOCKS5BytestreamServerSession::sendData, this)); connection->onDataRead.disconnect(boost::bind(&SOCKS5BytestreamServerSession::handleDataRead, this, _1)); connection->disconnect(); + if (readBytestream) { + readBytestream->onDataAvailable.disconnect(boost::bind(&SOCKS5BytestreamServerSession::handleDataAvailable, this)); + } state = Finished; } @@ -75,6 +78,12 @@ void SOCKS5BytestreamServerSession::handleDataRead(boost::shared_ptr<SafeByteArr } } +void SOCKS5BytestreamServerSession::handleDataAvailable() { + if (waitingForData) { + sendData(); + } +} + void SOCKS5BytestreamServerSession::handleDisconnected(const boost::optional<Connection::Error>& error) { SWIFT_LOG(debug) << (error ? (error == Connection::ReadError ? "Read Error" : "Write Error") : "No Error") << std::endl; if (error) { @@ -136,6 +145,11 @@ void SOCKS5BytestreamServerSession::process() { connection->write(result); bytestreams->serverSessions[streamID] = this; state = ReadyForTransfer; + + if (readBytestream) { + readBytestream->onDataAvailable.connect(boost::bind(&SOCKS5BytestreamServerSession::handleDataAvailable, this)); + } + } } } @@ -146,8 +160,14 @@ void SOCKS5BytestreamServerSession::sendData() { if (!readBytestream->isFinished()) { try { SafeByteArray dataToSend = createSafeByteArray(*readBytestream->read(chunkSize)); - connection->write(dataToSend); - onBytesSent(dataToSend.size()); + if (!dataToSend.empty()) { + connection->write(dataToSend); + onBytesSent(dataToSend.size()); + waitingForData = false; + } + else { + waitingForData = true; + } } catch (const BytestreamException&) { finish(true); diff --git a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h index 4557a36..ed77df8 100644 --- a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h +++ b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h @@ -54,6 +54,7 @@ namespace Swift { void process(); void handleDataRead(boost::shared_ptr<SafeByteArray>); void handleDisconnected(const boost::optional<Connection::Error>&); + void handleDataAvailable(); void sendData(); private: @@ -64,5 +65,6 @@ namespace Swift { int chunkSize; boost::shared_ptr<ReadBytestream> readBytestream; boost::shared_ptr<WriteBytestream> writeBytestream; + bool waitingForData; }; } diff --git a/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp b/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp index e6df862..6dec37f 100644 --- a/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp +++ b/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp @@ -28,6 +28,8 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture { CPPUNIT_TEST(testRequest_UnknownBytestream); CPPUNIT_TEST(testReceiveData); CPPUNIT_TEST(testReceiveData_Chunked); + CPPUNIT_TEST(testDataStreamPauseStopsSendingData); + CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData); CPPUNIT_TEST_SUITE_END(); public: @@ -38,6 +40,7 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture { connection = boost::make_shared<DummyConnection>(eventLoop); connection->onDataSent.connect(boost::bind(&SOCKS5BytestreamServerSessionTest::handleDataWritten, this, _1)); stream1 = boost::make_shared<ByteArrayReadBytestream>(createByteArray("abcdefg")); + finished = false; } void tearDown() { @@ -117,6 +120,46 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture { CPPUNIT_ASSERT_EQUAL(4, receivedDataChunks); } + void testDataStreamPauseStopsSendingData() { + boost::shared_ptr<SOCKS5BytestreamServerSession> testling(createSession()); + testling->setChunkSize(3); + stream1->setDataComplete(false); + StartStopper<SOCKS5BytestreamServerSession> stopper(testling.get()); + bytestreams->addReadBytestream("abcdef", stream1); + authenticate(); + request("abcdef"); + eventLoop->processEvents(); + testling->startTransfer(); + eventLoop->processEvents(); + skipHeader("abcdef"); + CPPUNIT_ASSERT(createByteArray("abcdefg") == receivedData); + CPPUNIT_ASSERT_EQUAL(4, receivedDataChunks); + + CPPUNIT_ASSERT(!finished); + CPPUNIT_ASSERT(!error); + } + + void testDataStreamResumeAfterPauseSendsData() { + boost::shared_ptr<SOCKS5BytestreamServerSession> testling(createSession()); + testling->setChunkSize(3); + stream1->setDataComplete(false); + StartStopper<SOCKS5BytestreamServerSession> stopper(testling.get()); + bytestreams->addReadBytestream("abcdef", stream1); + authenticate(); + request("abcdef"); + eventLoop->processEvents(); + testling->startTransfer(); + eventLoop->processEvents(); + skipHeader("abcdef"); + + stream1->addData(createByteArray("xyz")); + eventLoop->processEvents(); + + CPPUNIT_ASSERT(createByteArray("abcdefgxyz") == receivedData); + CPPUNIT_ASSERT(!finished); + CPPUNIT_ASSERT(!error); + } + private: void receive(const SafeByteArray& data) { connection->receive(data); @@ -147,9 +190,15 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture { private: SOCKS5BytestreamServerSession* createSession() { SOCKS5BytestreamServerSession* session = new SOCKS5BytestreamServerSession(connection, bytestreams); + session->onFinished.connect(boost::bind(&SOCKS5BytestreamServerSessionTest::handleFinished, this, _1)); return session; } + void handleFinished(boost::optional<FileTransferError> error) { + finished = true; + this->error = error; + } + private: DummyEventLoop* eventLoop; SOCKS5BytestreamRegistry* bytestreams; @@ -157,6 +206,8 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture { std::vector<unsigned char> receivedData; int receivedDataChunks; boost::shared_ptr<ByteArrayReadBytestream> stream1; + bool finished; + boost::optional<FileTransferError> error; }; CPPUNIT_TEST_SUITE_REGISTRATION(SOCKS5BytestreamServerSessionTest); -- cgit v0.10.2-6-g49f6