diff options
author | Remko Tronçon <git@el-tramo.be> | 2011-09-29 20:21:01 (GMT) |
---|---|---|
committer | Remko Tronçon <git@el-tramo.be> | 2011-09-29 20:21:01 (GMT) |
commit | eb25d910e7991503eaa74233f0b396648f512e88 (patch) | |
tree | caeb9c22e97991be73235285c53fc93197a5eebb | |
parent | bea648fcd6bce4f0d7f19725f60e5b2b3ef0a340 (diff) | |
download | swift-contrib-eb25d910e7991503eaa74233f0b396648f512e88.zip swift-contrib-eb25d910e7991503eaa74233f0b396648f512e88.tar.bz2 |
Allow ReadBytestreams to be resumed.
-rw-r--r-- | Swiften/FileTransfer/ByteArrayReadBytestream.h | 15 | ||||
-rw-r--r-- | Swiften/FileTransfer/IBBSendSession.cpp | 42 | ||||
-rw-r--r-- | Swiften/FileTransfer/IBBSendSession.h | 4 | ||||
-rw-r--r-- | Swiften/FileTransfer/ReadBytestream.h | 10 | ||||
-rw-r--r-- | Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp | 67 |
5 files changed, 121 insertions, 17 deletions
diff --git a/Swiften/FileTransfer/ByteArrayReadBytestream.h b/Swiften/FileTransfer/ByteArrayReadBytestream.h index a6945c3..6cbdef0 100644 --- a/Swiften/FileTransfer/ByteArrayReadBytestream.h +++ b/Swiften/FileTransfer/ByteArrayReadBytestream.h @@ -2,38 +2,49 @@ * Copyright (c) 2010 Remko Tronçon * Licensed under the GNU General Public License v3. * See Documentation/Licenses/GPLv3.txt for more information. */ #pragma once #include <vector> +#include <Swiften/Base/Algorithm.h> #include <Swiften/FileTransfer/ReadBytestream.h> namespace Swift { class ByteArrayReadBytestream : public ReadBytestream { public: - ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0) { + ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0), dataComplete(true) { } virtual std::vector<unsigned char> read(size_t size) { size_t readSize = size; if (position + readSize > data.size()) { readSize = data.size() - position; } std::vector<unsigned char> result(data.begin() + position, data.begin() + position + readSize); onRead(result); position += readSize; return result; } virtual bool isFinished() const { - return position >= data.size(); + return position >= data.size() && dataComplete; + } + + virtual void setDataComplete(bool b) { + dataComplete = b; + } + + void addData(const std::vector<unsigned char>& moreData) { + append(data, moreData); + onDataAvailable(); } private: std::vector<unsigned char> data; size_t position; + bool dataComplete; }; } diff --git a/Swiften/FileTransfer/IBBSendSession.cpp b/Swiften/FileTransfer/IBBSendSession.cpp index b925c5f..3a1390c 100644 --- a/Swiften/FileTransfer/IBBSendSession.cpp +++ b/Swiften/FileTransfer/IBBSendSession.cpp @@ -8,22 +8,24 @@ #include <boost/bind.hpp> #include <Swiften/Queries/IQRouter.h> #include <Swiften/FileTransfer/IBBRequest.h> #include <Swiften/FileTransfer/BytestreamException.h> namespace Swift { -IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false) { +IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false), waitingForData(false) { + bytestream->onDataAvailable.connect(boost::bind(&IBBSendSession::handleDataAvailable, this)); } IBBSendSession::~IBBSendSession() { + bytestream->onDataAvailable.disconnect(boost::bind(&IBBSendSession::handleDataAvailable, this)); } void IBBSendSession::start() { IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBOpen(id, blockSize), router); request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2)); active = true; request->send(); } @@ -31,36 +33,52 @@ void IBBSendSession::stop() { if (active && router->isAvailable()) { IBBRequest::create(from, to, IBB::createIBBClose(id), router)->send(); } finish(boost::optional<FileTransferError>()); } void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) { if (!error && active) { if (!bytestream->isFinished()) { - try { - std::vector<unsigned char> data = bytestream->read(blockSize); - IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router); - sequenceNumber++; - request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2)); - request->send(); - onBytesSent(data.size()); - } - catch (const BytestreamException&) { - finish(FileTransferError(FileTransferError::ReadError)); - } + sendMoreData(); } else { finish(boost::optional<FileTransferError>()); } } else { finish(FileTransferError(FileTransferError::PeerError)); } } +void IBBSendSession::sendMoreData() { + try { + std::vector<unsigned char> data = bytestream->read(blockSize); + if (!data.empty()) { + waitingForData = false; + IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router); + sequenceNumber++; + request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2)); + request->send(); + onBytesSent(data.size()); + } + else { + waitingForData = true; + } + } + catch (const BytestreamException&) { + finish(FileTransferError(FileTransferError::ReadError)); + } +} + void IBBSendSession::finish(boost::optional<FileTransferError> error) { active = false; onFinished(error); } +void IBBSendSession::handleDataAvailable() { + if (waitingForData) { + sendMoreData(); + } +} + } diff --git a/Swiften/FileTransfer/IBBSendSession.h b/Swiften/FileTransfer/IBBSendSession.h index 8e5ace1..abd217b 100644 --- a/Swiften/FileTransfer/IBBSendSession.h +++ b/Swiften/FileTransfer/IBBSendSession.h @@ -35,24 +35,28 @@ namespace Swift { return to; } void setBlockSize(int blockSize) { this->blockSize = blockSize; } boost::signal<void (boost::optional<FileTransferError>)> onFinished; boost::signal<void (int)> onBytesSent; + private: void handleIBBResponse(IBB::ref, ErrorPayload::ref); void finish(boost::optional<FileTransferError>); + void sendMoreData(); + void handleDataAvailable(); private: std::string id; JID from; JID to; boost::shared_ptr<ReadBytestream> bytestream; IQRouter* router; int blockSize; int sequenceNumber; bool active; + bool waitingForData; }; } diff --git a/Swiften/FileTransfer/ReadBytestream.h b/Swiften/FileTransfer/ReadBytestream.h index 9e070f7..0e95f7b 100644 --- a/Swiften/FileTransfer/ReadBytestream.h +++ b/Swiften/FileTransfer/ReadBytestream.h @@ -1,24 +1,30 @@ /* * Copyright (c) 2010 Remko Tronçon * Licensed under the GNU General Public License v3. * See Documentation/Licenses/GPLv3.txt for more information. */ #pragma once #include <vector> -#include <cstring> #include <Swiften/Base/boost_bsignals.h> namespace Swift { class ReadBytestream { public: virtual ~ReadBytestream(); + + /** + * Return an empty vector if no more data is available. + * Use onDataAvailable signal for signaling there is data available again. + */ virtual std::vector<unsigned char> read(size_t size) = 0; + virtual bool isFinished() const = 0; public: - boost::signal<void (std::vector<unsigned char>)> onRead; + boost::signal<void ()> onDataAvailable; + boost::signal<void (const std::vector<unsigned char>&)> onRead; }; } diff --git a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp index c88635f..d12f99e 100644 --- a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp +++ b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp @@ -20,25 +20,31 @@ using namespace Swift; class IBBSendSessionTest : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(IBBSendSessionTest); CPPUNIT_TEST(testStart); CPPUNIT_TEST(testStart_ResponseStartsSending); CPPUNIT_TEST(testResponseContinuesSending); CPPUNIT_TEST(testRespondToAllFinishes); CPPUNIT_TEST(testErrorResponseFinishesWithError); CPPUNIT_TEST(testStopDuringSessionCloses); CPPUNIT_TEST(testStopAfterFinishedDoesNotClose); + CPPUNIT_TEST(testDataStreamPauseStopsSendingData); + CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData); + CPPUNIT_TEST(testDataStreamResumeBeforePauseDoesNotSendData); + CPPUNIT_TEST(testDataStreamResumeAfterResumeDoesNotSendData); + CPPUNIT_TEST_SUITE_END(); public: void setUp() { stanzaChannel = new DummyStanzaChannel(); iqRouter = new IQRouter(stanzaChannel); bytestream = boost::shared_ptr<ByteArrayReadBytestream>(new ByteArrayReadBytestream(createByteArray("abcdefg"))); + finished = false; } void tearDown() { delete iqRouter; delete stanzaChannel; } void testStart() { boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); @@ -130,19 +136,78 @@ class IBBSendSessionTest : public CppUnit::TestFixture { testling->start(); stanzaChannel->onIQReceived(createIBBResult()); stanzaChannel->onIQReceived(createIBBResult()); CPPUNIT_ASSERT(finished); testling->stop(); CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size())); } - + + void testDataStreamPauseStopsSendingData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + + CPPUNIT_ASSERT(!finished); + CPPUNIT_ASSERT(!error); + + CPPUNIT_ASSERT_EQUAL(4, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + + void testDataStreamResumeAfterPauseSendsData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + + bytestream->addData(createByteArray("xyz")); + + CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + + void testDataStreamResumeBeforePauseDoesNotSendData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + + bytestream->addData(createByteArray("xyz")); + + CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + + void testDataStreamResumeAfterResumeDoesNotSendData() { + boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz"); + bytestream->setDataComplete(false); + testling->setBlockSize(3); + testling->start(); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + stanzaChannel->onIQReceived(createIBBResult()); + + bytestream->addData(createByteArray("xyz")); + bytestream->addData(createByteArray("xuv")); + + CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size())); + } + private: IQ::ref createIBBResult() { return IQ::createResult(JID("baz@fum.com/dum"), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getTo(), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getID(), boost::shared_ptr<IBB>()); } private: boost::shared_ptr<IBBSendSession> createSession(const std::string& to) { boost::shared_ptr<IBBSendSession> session(new IBBSendSession("myid", JID(), JID(to), bytestream, iqRouter)); session->onFinished.connect(boost::bind(&IBBSendSessionTest::handleFinished, this, _1)); |