From eb25d910e7991503eaa74233f0b396648f512e88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Remko=20Tron=C3=A7on?= <git@el-tramo.be> Date: Thu, 29 Sep 2011 22:21:01 +0200 Subject: Allow ReadBytestreams to be resumed. diff --git a/Swiften/FileTransfer/ByteArrayReadBytestream.h b/Swiften/FileTransfer/ByteArrayReadBytestream.h index a6945c3..6cbdef0 100644 --- a/Swiften/FileTransfer/ByteArrayReadBytestream.h +++ b/Swiften/FileTransfer/ByteArrayReadBytestream.h @@ -8,12 +8,13 @@ #include <vector> +#include <Swiften/Base/Algorithm.h> #include <Swiften/FileTransfer/ReadBytestream.h> namespace Swift { class ByteArrayReadBytestream : public ReadBytestream { public: - ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0) { + ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0), dataComplete(true) { } virtual std::vector<unsigned char> read(size_t size) { @@ -29,11 +30,21 @@ namespace Swift { } virtual bool isFinished() const { - return position >= data.size(); + return position >= data.size() && dataComplete; + } + + virtual void setDataComplete(bool b) { + dataComplete = b; + } + + void addData(const std::vector<unsigned char>& moreData) { + append(data, moreData); + onDataAvailable(); } private: std::vector<unsigned char> data; size_t position; + bool dataComplete; }; } diff --git a/Swiften/FileTransfer/IBBSendSession.cpp b/Swiften/FileTransfer/IBBSendSession.cpp index b925c5f..3a1390c 100644 --- a/Swiften/FileTransfer/IBBSendSession.cpp +++ b/Swiften/FileTransfer/IBBSendSession.cpp @@ -14,10 +14,12 @@ namespace Swift { -IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false) { +IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false), waitingForData(false) { + bytestream->onDataAvailable.connect(boost::bind(&IBBSendSession::handleDataAvailable, this)); } IBBSendSession::~IBBSendSession() { + bytestream->onDataAvailable.disconnect(boost::bind(&IBBSendSession::handleDataAvailable, this)); } void IBBSendSession::start() { @@ -37,17 +39,7 @@ void IBBSendSession::stop() { void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) { if (!error && active) { if (!bytestream->isFinished()) { - try { - std::vector<unsigned char> data = bytestream->read(blockSize); - IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router); - sequenceNumber++; - request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2)); - request->send(); - onBytesSent(data.size()); - } - catch (const BytestreamException&) { - finish(FileTransferError(FileTransferError::ReadError)); - } + sendMoreData(); } else { finish(boost::optional<FileTransferError>()); @@ -58,9 +50,35 @@ void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) { } } +void IBBSendSession::sendMoreData() { + try { + std::vector<unsigned char> data = bytestream->read(blockSize); + if (!data.empty()) { + waitingForData = false; + IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router); + sequenceNumber++; + request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2)); + request->send(); + onBytesSent(data.size()); + } + else { + waitingForData = true; + } + } + catch (const BytestreamException&) { + finish(FileTransferError(FileTransferError::ReadError)); + } +} + void IBBSendSession::finish(boost::optional<FileTransferError> error) { active = false; onFinished(error); } +void IBBSendSession::handleDataAvailable() { + if (waitingForData) { + sendMoreData(); + } +} + } diff --git a/Swiften/FileTransfer/IBBSendSession.h b/Swiften/FileTransfer/IBBSendSession.h index 8e5ace1..abd217b 100644 --- a/Swiften/FileTransfer/IBBSendSession.h +++ b/Swiften/FileTransfer/IBBSendSession.h @@ -41,9 +41,12 @@ namespace Swift { boost::signal<void (boost::optional<FileTransferError>)> onFinished; boost::signal<void (int)> onBytesSent; + private: void handleIBBResponse(IBB::ref, ErrorPayload::ref); void finish(boost::optional<FileTransferError>); + void sendMoreData(); + void handleDataAvailable(); private: std::string id; @@ -54,5 +57,6 @@ namespace Swift { int blockSize; int sequenceNumber; bool active; + bool waitingForData; }; } diff --git a/Swiften/FileTransfer/ReadBytestream.h b/Swiften/FileTransfer/ReadBytestream.h index 9e070f7..0e95f7b 100644 --- a/Swiften/FileTransfer/ReadBytestream.h +++ b/Swiften/FileTransfer/ReadBytestream.h @@ -7,7 +7,6 @@ #pragma once #include <vector> -#include <cstring> #include <Swiften/Base/boost_bsignals.h> @@ -15,10 +14,17 @@ namespace Swift { class ReadBytestream { public: virtual ~ReadBytestream(); + + /** + * Return an empty vector if no more data is available. + * Use onDataAvailable signal for signaling there is data available again. + */ virtual std::vector<unsigned char> read(size_t size) = 0; + virtual bool isFinished() const = 0; public: - boost::signal<void (std::vector<unsigned char>)> onRead; + boost::signal<void ()> onDataAvailable; + boost::signal<void (const std::vector<unsigned char>&)> onRead; }; } diff --git a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp index c88635f..d12f99e 100644 --- a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp +++ b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp @@ -26,6 +26,11 @@ class IBBSendSessionTest : public CppUnit::TestFixture { CPPUNIT_TEST(testErrorResponseFinishesWithError); CPPUNIT_TEST(testStopDuringSessionCloses); CPPUNIT_TEST(testStopAfterFinishedDoesNotClose); + CPPUNIT_TEST(testDataStreamPauseStopsSendingData); + CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData); + CPPUNIT_TEST(testDataStreamResumeBeforePauseDoesNotSendData); + CPPUNIT_TEST(testDataStreamResumeAfterResumeDoesNotSendData); + CPPUNIT_TEST_SUITE_END(); public: @@ -33,6 +38,7 @@ class IBBSendSessionTest : public CppUnit::TestFixture { stanzaChannel = new DummyStanzaChannel(); iqRouter = new IQRouter(stanzaChannel); bytestream = boost::shared_ptr<ByteArrayReadBytestream>(new ByteArrayReadBytestream(createByteArray("abcdefg"))); + finished = false; } void tearDown() { @@ -136,7 +142,66 @@ class IBBSendSessionTest : public CppUnit::TestFixture { CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size())); } - + + void testDataStreamPauseStopsSendingData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + + CPPUNIT_ASSERT(!finished); + CPPUNIT_ASSERT(!error); + + CPPUNIT_ASSERT_EQUAL(4, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + + void testDataStreamResumeAfterPauseSendsData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + + bytestream->addData(createByteArray("xyz")); + + CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + + void testDataStreamResumeBeforePauseDoesNotSendData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + + bytestream->addData(createByteArray("xyz")); + + CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + + void testDataStreamResumeAfterResumeDoesNotSendData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + + bytestream->addData(createByteArray("xyz")); + bytestream->addData(createByteArray("xuv")); + + CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + private: IQ::ref createIBBResult() { return IQ::createResult(JID("baz@fum.com/dum"), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getTo(), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getID(), boost::shared_ptr<IBB>()); -- cgit v0.10.2-6-g49f6