From ca8ee84e53e7d4910a7c17cf05e70b0b3377c9c3 Mon Sep 17 00:00:00 2001
From: HanzZ <hanzz.k@gmail.com>
Date: Wed, 12 Sep 2012 07:36:27 +0200
Subject: Add support for pause/resume using Bytestream for
 SOCKS5BytestreamServerSession.

Copyright (c) 2012 Jan Kaluza
Licensed under the Simplified BSD license.
See Documentation/Licenses/BSD-simplified.txt for more information.

diff --git a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp
index 2260fc20..4412d0b 100644
--- a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp
+++ b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.cpp
@@ -19,7 +19,7 @@
 
 namespace Swift {
 
-SOCKS5BytestreamServerSession::SOCKS5BytestreamServerSession(boost::shared_ptr<Connection> connection, SOCKS5BytestreamRegistry* bytestreams) : connection(connection), bytestreams(bytestreams), state(Initial), chunkSize(131072) {
+SOCKS5BytestreamServerSession::SOCKS5BytestreamServerSession(boost::shared_ptr<Connection> connection, SOCKS5BytestreamRegistry* bytestreams) : connection(connection), bytestreams(bytestreams), state(Initial), chunkSize(131072), waitingForData(false) {
 	connection->onDisconnected.connect(boost::bind(&SOCKS5BytestreamServerSession::handleDisconnected, this, _1));
 }
 
@@ -40,6 +40,9 @@ void SOCKS5BytestreamServerSession::stop() {
 	connection->onDataWritten.disconnect(boost::bind(&SOCKS5BytestreamServerSession::sendData, this));
 	connection->onDataRead.disconnect(boost::bind(&SOCKS5BytestreamServerSession::handleDataRead, this, _1));
 	connection->disconnect();
+	if (readBytestream) {
+			readBytestream->onDataAvailable.disconnect(boost::bind(&SOCKS5BytestreamServerSession::handleDataAvailable, this));
+	}
 	state = Finished;
 }
 
@@ -75,6 +78,12 @@ void SOCKS5BytestreamServerSession::handleDataRead(boost::shared_ptr<SafeByteArr
 	}
 }
 
+void SOCKS5BytestreamServerSession::handleDataAvailable() {
+	if (waitingForData) {
+		sendData();
+	}
+}
+
 void SOCKS5BytestreamServerSession::handleDisconnected(const boost::optional<Connection::Error>& error) {
 	SWIFT_LOG(debug) << (error ? (error == Connection::ReadError ? "Read Error" : "Write Error") : "No Error") << std::endl;
 	if (error) {
@@ -136,6 +145,11 @@ void SOCKS5BytestreamServerSession::process() {
 					connection->write(result);
 					bytestreams->serverSessions[streamID] = this;
 					state = ReadyForTransfer;
+
+					if (readBytestream) {
+							readBytestream->onDataAvailable.connect(boost::bind(&SOCKS5BytestreamServerSession::handleDataAvailable, this));
+					}
+
 				}
 			}
 		}
@@ -146,8 +160,14 @@ void SOCKS5BytestreamServerSession::sendData() {
 	if (!readBytestream->isFinished()) {
 		try {
 			SafeByteArray dataToSend = createSafeByteArray(*readBytestream->read(chunkSize));
-			connection->write(dataToSend);
-			onBytesSent(dataToSend.size());
+			if (!dataToSend.empty()) {
+				connection->write(dataToSend);
+				onBytesSent(dataToSend.size());
+				waitingForData = false;
+			}
+			else {
+				waitingForData = true;
+			}
 		}
 		catch (const BytestreamException&) {
 			finish(true);
diff --git a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h
index 4557a36..ed77df8 100644
--- a/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h
+++ b/Swiften/FileTransfer/SOCKS5BytestreamServerSession.h
@@ -54,6 +54,7 @@ namespace Swift {
 			void process();
 			void handleDataRead(boost::shared_ptr<SafeByteArray>);
 			void handleDisconnected(const boost::optional<Connection::Error>&);
+			void handleDataAvailable();
 			void sendData();
 
 		private:
@@ -64,5 +65,6 @@ namespace Swift {
 			int chunkSize;
 			boost::shared_ptr<ReadBytestream> readBytestream;
 			boost::shared_ptr<WriteBytestream> writeBytestream;
+			bool waitingForData;
 	};
 }
diff --git a/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp b/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp
index e6df862..6dec37f 100644
--- a/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp
+++ b/Swiften/FileTransfer/UnitTest/SOCKS5BytestreamServerSessionTest.cpp
@@ -28,6 +28,8 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
 		CPPUNIT_TEST(testRequest_UnknownBytestream);
 		CPPUNIT_TEST(testReceiveData);
 		CPPUNIT_TEST(testReceiveData_Chunked);
+		CPPUNIT_TEST(testDataStreamPauseStopsSendingData);
+		CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData);
 		CPPUNIT_TEST_SUITE_END();
 
 	public:
@@ -38,6 +40,7 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
 			connection = boost::make_shared<DummyConnection>(eventLoop);
 			connection->onDataSent.connect(boost::bind(&SOCKS5BytestreamServerSessionTest::handleDataWritten, this, _1));
 			stream1 = boost::make_shared<ByteArrayReadBytestream>(createByteArray("abcdefg"));
+			finished = false;
 		}
 
 		void tearDown() {
@@ -117,6 +120,46 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
 			CPPUNIT_ASSERT_EQUAL(4, receivedDataChunks);
 		}
 
+		void testDataStreamPauseStopsSendingData() {
+			boost::shared_ptr<SOCKS5BytestreamServerSession> testling(createSession());
+			testling->setChunkSize(3);
+			stream1->setDataComplete(false);
+			StartStopper<SOCKS5BytestreamServerSession> stopper(testling.get());
+			bytestreams->addReadBytestream("abcdef", stream1);
+			authenticate();
+			request("abcdef");
+			eventLoop->processEvents();
+			testling->startTransfer();
+			eventLoop->processEvents();
+			skipHeader("abcdef");
+			CPPUNIT_ASSERT(createByteArray("abcdefg") == receivedData);
+			CPPUNIT_ASSERT_EQUAL(4, receivedDataChunks);
+
+			CPPUNIT_ASSERT(!finished);
+			CPPUNIT_ASSERT(!error);
+		}
+
+		void testDataStreamResumeAfterPauseSendsData() {
+			boost::shared_ptr<SOCKS5BytestreamServerSession> testling(createSession());
+			testling->setChunkSize(3);
+			stream1->setDataComplete(false);
+			StartStopper<SOCKS5BytestreamServerSession> stopper(testling.get());
+			bytestreams->addReadBytestream("abcdef", stream1);
+			authenticate();
+			request("abcdef");
+			eventLoop->processEvents();
+			testling->startTransfer();
+			eventLoop->processEvents();
+			skipHeader("abcdef");
+
+			stream1->addData(createByteArray("xyz"));
+			eventLoop->processEvents();
+
+			CPPUNIT_ASSERT(createByteArray("abcdefgxyz") == receivedData);
+			CPPUNIT_ASSERT(!finished);
+			CPPUNIT_ASSERT(!error);
+		}
+
 	private:
 		void receive(const SafeByteArray& data) {
 			connection->receive(data);
@@ -147,9 +190,15 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
 	private:
 		SOCKS5BytestreamServerSession* createSession() {
 			SOCKS5BytestreamServerSession* session = new SOCKS5BytestreamServerSession(connection, bytestreams);
+			session->onFinished.connect(boost::bind(&SOCKS5BytestreamServerSessionTest::handleFinished, this, _1));
 			return session;
 		}
 
+		void handleFinished(boost::optional<FileTransferError> error) {
+			finished = true;
+			this->error = error;
+		}
+
 	private:
 		DummyEventLoop* eventLoop;
 		SOCKS5BytestreamRegistry* bytestreams;
@@ -157,6 +206,8 @@ class SOCKS5BytestreamServerSessionTest : public CppUnit::TestFixture {
 		std::vector<unsigned char> receivedData;
 		int receivedDataChunks;
 		boost::shared_ptr<ByteArrayReadBytestream> stream1;
+		bool finished;
+		boost::optional<FileTransferError> error;
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(SOCKS5BytestreamServerSessionTest);
-- 
cgit v0.10.2-6-g49f6