From eb25d910e7991503eaa74233f0b396648f512e88 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Remko=20Tron=C3=A7on?= <git@el-tramo.be>
Date: Thu, 29 Sep 2011 22:21:01 +0200
Subject: Allow ReadBytestreams to be resumed.


diff --git a/Swiften/FileTransfer/ByteArrayReadBytestream.h b/Swiften/FileTransfer/ByteArrayReadBytestream.h
index a6945c3..6cbdef0 100644
--- a/Swiften/FileTransfer/ByteArrayReadBytestream.h
+++ b/Swiften/FileTransfer/ByteArrayReadBytestream.h
@@ -8,12 +8,13 @@
 
 #include <vector>
 
+#include <Swiften/Base/Algorithm.h>
 #include <Swiften/FileTransfer/ReadBytestream.h>
 
 namespace Swift {
 	class ByteArrayReadBytestream : public ReadBytestream {
 		public:
-			ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0) {
+			ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0), dataComplete(true) {
 			}
 
 			virtual std::vector<unsigned char> read(size_t size) {
@@ -29,11 +30,21 @@ namespace Swift {
 			}
 
 			virtual bool isFinished() const {
-				return position >= data.size();
+				return position >= data.size() && dataComplete;
+			}
+
+			virtual void setDataComplete(bool b) {
+				dataComplete = b;
+			}
+
+			void addData(const std::vector<unsigned char>& moreData) {
+				append(data, moreData);
+				onDataAvailable();
 			}
 
 		private:
 			std::vector<unsigned char> data;
 			size_t position;
+			bool dataComplete;
 	};
 }
diff --git a/Swiften/FileTransfer/IBBSendSession.cpp b/Swiften/FileTransfer/IBBSendSession.cpp
index b925c5f..3a1390c 100644
--- a/Swiften/FileTransfer/IBBSendSession.cpp
+++ b/Swiften/FileTransfer/IBBSendSession.cpp
@@ -14,10 +14,12 @@
 
 namespace Swift {
 
-IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false) {
+IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false), waitingForData(false) {
+	bytestream->onDataAvailable.connect(boost::bind(&IBBSendSession::handleDataAvailable, this));
 }
 
 IBBSendSession::~IBBSendSession() {
+	bytestream->onDataAvailable.disconnect(boost::bind(&IBBSendSession::handleDataAvailable, this));
 }
 
 void IBBSendSession::start() {
@@ -37,17 +39,7 @@ void IBBSendSession::stop() {
 void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) {
 	if (!error && active) {
 		if (!bytestream->isFinished()) {
-			try {
-				std::vector<unsigned char> data = bytestream->read(blockSize);
-				IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router);
-				sequenceNumber++;
-				request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
-				request->send();
-				onBytesSent(data.size());
-			}
-			catch (const BytestreamException&) {
-				finish(FileTransferError(FileTransferError::ReadError));
-			}
+			sendMoreData();
 		}
 		else {
 			finish(boost::optional<FileTransferError>());
@@ -58,9 +50,35 @@ void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) {
 	}
 }
 
+void IBBSendSession::sendMoreData() {
+	try {
+		std::vector<unsigned char> data = bytestream->read(blockSize);
+		if (!data.empty()) {
+			waitingForData = false;
+			IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router);
+			sequenceNumber++;
+			request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
+			request->send();
+			onBytesSent(data.size());
+		}
+		else {
+			waitingForData = true;
+		}
+	}
+	catch (const BytestreamException&) {
+		finish(FileTransferError(FileTransferError::ReadError));
+	}
+}
+
 void IBBSendSession::finish(boost::optional<FileTransferError> error) {
 	active = false;
 	onFinished(error);
 }
 
+void IBBSendSession::handleDataAvailable() {
+	if (waitingForData) {
+		sendMoreData();
+	}
+}
+
 }
diff --git a/Swiften/FileTransfer/IBBSendSession.h b/Swiften/FileTransfer/IBBSendSession.h
index 8e5ace1..abd217b 100644
--- a/Swiften/FileTransfer/IBBSendSession.h
+++ b/Swiften/FileTransfer/IBBSendSession.h
@@ -41,9 +41,12 @@ namespace Swift {
 
 			boost::signal<void (boost::optional<FileTransferError>)> onFinished;
 			boost::signal<void (int)> onBytesSent;
+
 		private:
 			void handleIBBResponse(IBB::ref, ErrorPayload::ref);
 			void finish(boost::optional<FileTransferError>);
+			void sendMoreData();
+			void handleDataAvailable();
 
 		private:
 			std::string id;
@@ -54,5 +57,6 @@ namespace Swift {
 			int blockSize;
 			int sequenceNumber;
 			bool active;
+			bool waitingForData;
 	};
 }
diff --git a/Swiften/FileTransfer/ReadBytestream.h b/Swiften/FileTransfer/ReadBytestream.h
index 9e070f7..0e95f7b 100644
--- a/Swiften/FileTransfer/ReadBytestream.h
+++ b/Swiften/FileTransfer/ReadBytestream.h
@@ -7,7 +7,6 @@
 #pragma once
 
 #include <vector>
-#include <cstring>
 
 #include <Swiften/Base/boost_bsignals.h>
 
@@ -15,10 +14,17 @@ namespace Swift {
 	class ReadBytestream {
 		public:
 			virtual ~ReadBytestream();
+
+			/**
+			 * Return an empty vector if no more data is available.
+			 * Use onDataAvailable signal for signaling there is data available again.
+			 */
 			virtual std::vector<unsigned char> read(size_t size) = 0;
+
 			virtual bool isFinished() const = 0;
 
 		public:
-			boost::signal<void (std::vector<unsigned char>)> onRead;
+			boost::signal<void ()> onDataAvailable;
+			boost::signal<void (const std::vector<unsigned char>&)> onRead;
 	};
 }
diff --git a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
index c88635f..d12f99e 100644
--- a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
+++ b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
@@ -26,6 +26,11 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
 		CPPUNIT_TEST(testErrorResponseFinishesWithError);
 		CPPUNIT_TEST(testStopDuringSessionCloses);
 		CPPUNIT_TEST(testStopAfterFinishedDoesNotClose);
+		CPPUNIT_TEST(testDataStreamPauseStopsSendingData);
+		CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData);
+		CPPUNIT_TEST(testDataStreamResumeBeforePauseDoesNotSendData);
+		CPPUNIT_TEST(testDataStreamResumeAfterResumeDoesNotSendData);
+
 		CPPUNIT_TEST_SUITE_END();
 
 	public:
@@ -33,6 +38,7 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
 			stanzaChannel = new DummyStanzaChannel();
 			iqRouter = new IQRouter(stanzaChannel);
 			bytestream = boost::shared_ptr<ByteArrayReadBytestream>(new ByteArrayReadBytestream(createByteArray("abcdefg")));
+			finished = false;
 		}
 
 		void tearDown() {
@@ -136,7 +142,66 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
 
 			CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size()));
 		}
-	
+
+		void testDataStreamPauseStopsSendingData() {
+			boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+			bytestream->setDataComplete(false);
+			testling->setBlockSize(3);
+			testling->start();
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+
+			CPPUNIT_ASSERT(!finished);
+			CPPUNIT_ASSERT(!error);
+
+			CPPUNIT_ASSERT_EQUAL(4, static_cast<int>(stanzaChannel->sentStanzas.size()));
+		}
+
+		void testDataStreamResumeAfterPauseSendsData() {
+			boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+			bytestream->setDataComplete(false);
+			testling->setBlockSize(3);
+			testling->start();
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+
+			bytestream->addData(createByteArray("xyz"));
+
+			CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size()));
+		}
+
+		void testDataStreamResumeBeforePauseDoesNotSendData() {
+			boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+			bytestream->setDataComplete(false);
+			testling->setBlockSize(3);
+			testling->start();
+			stanzaChannel->onIQReceived(createIBBResult());
+
+			bytestream->addData(createByteArray("xyz"));
+
+			CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size()));
+		}
+
+		void testDataStreamResumeAfterResumeDoesNotSendData() {
+			boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+			bytestream->setDataComplete(false);
+			testling->setBlockSize(3);
+			testling->start();
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+			stanzaChannel->onIQReceived(createIBBResult());
+
+			bytestream->addData(createByteArray("xyz"));
+			bytestream->addData(createByteArray("xuv"));
+
+			CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size()));
+		}
+
 	private:
 		IQ::ref createIBBResult() {
 			return IQ::createResult(JID("baz@fum.com/dum"), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getTo(), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getID(), boost::shared_ptr<IBB>());
-- 
cgit v0.10.2-6-g49f6