summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRemko Tronçon <git@el-tramo.be>2011-09-29 20:21:01 (GMT)
committerRemko Tronçon <git@el-tramo.be>2011-09-29 20:21:01 (GMT)
commiteb25d910e7991503eaa74233f0b396648f512e88 (patch)
treecaeb9c22e97991be73235285c53fc93197a5eebb
parentbea648fcd6bce4f0d7f19725f60e5b2b3ef0a340 (diff)
downloadswift-contrib-eb25d910e7991503eaa74233f0b396648f512e88.zip
swift-contrib-eb25d910e7991503eaa74233f0b396648f512e88.tar.bz2
Allow ReadBytestreams to be resumed.
-rw-r--r--Swiften/FileTransfer/ByteArrayReadBytestream.h15
-rw-r--r--Swiften/FileTransfer/IBBSendSession.cpp42
-rw-r--r--Swiften/FileTransfer/IBBSendSession.h4
-rw-r--r--Swiften/FileTransfer/ReadBytestream.h10
-rw-r--r--Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp67
5 files changed, 121 insertions, 17 deletions
diff --git a/Swiften/FileTransfer/ByteArrayReadBytestream.h b/Swiften/FileTransfer/ByteArrayReadBytestream.h
index a6945c3..6cbdef0 100644
--- a/Swiften/FileTransfer/ByteArrayReadBytestream.h
+++ b/Swiften/FileTransfer/ByteArrayReadBytestream.h
@@ -2,38 +2,49 @@
* Copyright (c) 2010 Remko Tronçon
* Licensed under the GNU General Public License v3.
* See Documentation/Licenses/GPLv3.txt for more information.
*/
#pragma once
#include <vector>
+#include <Swiften/Base/Algorithm.h>
#include <Swiften/FileTransfer/ReadBytestream.h>
namespace Swift {
class ByteArrayReadBytestream : public ReadBytestream {
public:
- ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0) {
+ ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0), dataComplete(true) {
}
virtual std::vector<unsigned char> read(size_t size) {
size_t readSize = size;
if (position + readSize > data.size()) {
readSize = data.size() - position;
}
std::vector<unsigned char> result(data.begin() + position, data.begin() + position + readSize);
onRead(result);
position += readSize;
return result;
}
virtual bool isFinished() const {
- return position >= data.size();
+ return position >= data.size() && dataComplete;
+ }
+
+ virtual void setDataComplete(bool b) {
+ dataComplete = b;
+ }
+
+ void addData(const std::vector<unsigned char>& moreData) {
+ append(data, moreData);
+ onDataAvailable();
}
private:
std::vector<unsigned char> data;
size_t position;
+ bool dataComplete;
};
}
diff --git a/Swiften/FileTransfer/IBBSendSession.cpp b/Swiften/FileTransfer/IBBSendSession.cpp
index b925c5f..3a1390c 100644
--- a/Swiften/FileTransfer/IBBSendSession.cpp
+++ b/Swiften/FileTransfer/IBBSendSession.cpp
@@ -8,22 +8,24 @@
#include <boost/bind.hpp>
#include <Swiften/Queries/IQRouter.h>
#include <Swiften/FileTransfer/IBBRequest.h>
#include <Swiften/FileTransfer/BytestreamException.h>
namespace Swift {
-IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false) {
+IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false), waitingForData(false) {
+ bytestream->onDataAvailable.connect(boost::bind(&IBBSendSession::handleDataAvailable, this));
}
IBBSendSession::~IBBSendSession() {
+ bytestream->onDataAvailable.disconnect(boost::bind(&IBBSendSession::handleDataAvailable, this));
}
void IBBSendSession::start() {
IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBOpen(id, blockSize), router);
request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
active = true;
request->send();
}
@@ -31,36 +33,52 @@ void IBBSendSession::stop() {
if (active && router->isAvailable()) {
IBBRequest::create(from, to, IBB::createIBBClose(id), router)->send();
}
finish(boost::optional<FileTransferError>());
}
void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) {
if (!error && active) {
if (!bytestream->isFinished()) {
- try {
- std::vector<unsigned char> data = bytestream->read(blockSize);
- IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router);
- sequenceNumber++;
- request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
- request->send();
- onBytesSent(data.size());
- }
- catch (const BytestreamException&) {
- finish(FileTransferError(FileTransferError::ReadError));
- }
+ sendMoreData();
}
else {
finish(boost::optional<FileTransferError>());
}
}
else {
finish(FileTransferError(FileTransferError::PeerError));
}
}
+void IBBSendSession::sendMoreData() {
+ try {
+ std::vector<unsigned char> data = bytestream->read(blockSize);
+ if (!data.empty()) {
+ waitingForData = false;
+ IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router);
+ sequenceNumber++;
+ request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
+ request->send();
+ onBytesSent(data.size());
+ }
+ else {
+ waitingForData = true;
+ }
+ }
+ catch (const BytestreamException&) {
+ finish(FileTransferError(FileTransferError::ReadError));
+ }
+}
+
void IBBSendSession::finish(boost::optional<FileTransferError> error) {
active = false;
onFinished(error);
}
+void IBBSendSession::handleDataAvailable() {
+ if (waitingForData) {
+ sendMoreData();
+ }
+}
+
}
diff --git a/Swiften/FileTransfer/IBBSendSession.h b/Swiften/FileTransfer/IBBSendSession.h
index 8e5ace1..abd217b 100644
--- a/Swiften/FileTransfer/IBBSendSession.h
+++ b/Swiften/FileTransfer/IBBSendSession.h
@@ -35,24 +35,28 @@ namespace Swift {
return to;
}
void setBlockSize(int blockSize) {
this->blockSize = blockSize;
}
boost::signal<void (boost::optional<FileTransferError>)> onFinished;
boost::signal<void (int)> onBytesSent;
+
private:
void handleIBBResponse(IBB::ref, ErrorPayload::ref);
void finish(boost::optional<FileTransferError>);
+ void sendMoreData();
+ void handleDataAvailable();
private:
std::string id;
JID from;
JID to;
boost::shared_ptr<ReadBytestream> bytestream;
IQRouter* router;
int blockSize;
int sequenceNumber;
bool active;
+ bool waitingForData;
};
}
diff --git a/Swiften/FileTransfer/ReadBytestream.h b/Swiften/FileTransfer/ReadBytestream.h
index 9e070f7..0e95f7b 100644
--- a/Swiften/FileTransfer/ReadBytestream.h
+++ b/Swiften/FileTransfer/ReadBytestream.h
@@ -1,24 +1,30 @@
/*
* Copyright (c) 2010 Remko Tronçon
* Licensed under the GNU General Public License v3.
* See Documentation/Licenses/GPLv3.txt for more information.
*/
#pragma once
#include <vector>
-#include <cstring>
#include <Swiften/Base/boost_bsignals.h>
namespace Swift {
class ReadBytestream {
public:
virtual ~ReadBytestream();
+
+ /**
+ * Return an empty vector if no more data is available.
+ * Use onDataAvailable signal for signaling there is data available again.
+ */
virtual std::vector<unsigned char> read(size_t size) = 0;
+
virtual bool isFinished() const = 0;
public:
- boost::signal<void (std::vector<unsigned char>)> onRead;
+ boost::signal<void ()> onDataAvailable;
+ boost::signal<void (const std::vector<unsigned char>&)> onRead;
};
}
diff --git a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
index c88635f..d12f99e 100644
--- a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
+++ b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
@@ -20,25 +20,31 @@ using namespace Swift;
class IBBSendSessionTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(IBBSendSessionTest);
CPPUNIT_TEST(testStart);
CPPUNIT_TEST(testStart_ResponseStartsSending);
CPPUNIT_TEST(testResponseContinuesSending);
CPPUNIT_TEST(testRespondToAllFinishes);
CPPUNIT_TEST(testErrorResponseFinishesWithError);
CPPUNIT_TEST(testStopDuringSessionCloses);
CPPUNIT_TEST(testStopAfterFinishedDoesNotClose);
+ CPPUNIT_TEST(testDataStreamPauseStopsSendingData);
+ CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData);
+ CPPUNIT_TEST(testDataStreamResumeBeforePauseDoesNotSendData);
+ CPPUNIT_TEST(testDataStreamResumeAfterResumeDoesNotSendData);
+
CPPUNIT_TEST_SUITE_END();
public:
void setUp() {
stanzaChannel = new DummyStanzaChannel();
iqRouter = new IQRouter(stanzaChannel);
bytestream = boost::shared_ptr<ByteArrayReadBytestream>(new ByteArrayReadBytestream(createByteArray("abcdefg")));
+ finished = false;
}
void tearDown() {
delete iqRouter;
delete stanzaChannel;
}
void testStart() {
boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
@@ -130,19 +136,78 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
testling->start();
stanzaChannel->onIQReceived(createIBBResult());
stanzaChannel->onIQReceived(createIBBResult());
CPPUNIT_ASSERT(finished);
testling->stop();
CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size()));
}
-
+
+ void testDataStreamPauseStopsSendingData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ CPPUNIT_ASSERT(!finished);
+ CPPUNIT_ASSERT(!error);
+
+ CPPUNIT_ASSERT_EQUAL(4, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
+ void testDataStreamResumeAfterPauseSendsData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ bytestream->addData(createByteArray("xyz"));
+
+ CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
+ void testDataStreamResumeBeforePauseDoesNotSendData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ bytestream->addData(createByteArray("xyz"));
+
+ CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
+ void testDataStreamResumeAfterResumeDoesNotSendData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ bytestream->addData(createByteArray("xyz"));
+ bytestream->addData(createByteArray("xuv"));
+
+ CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
private:
IQ::ref createIBBResult() {
return IQ::createResult(JID("baz@fum.com/dum"), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getTo(), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getID(), boost::shared_ptr<IBB>());
}
private:
boost::shared_ptr<IBBSendSession> createSession(const std::string& to) {
boost::shared_ptr<IBBSendSession> session(new IBBSendSession("myid", JID(), JID(to), bytestream, iqRouter));
session->onFinished.connect(boost::bind(&IBBSendSessionTest::handleFinished, this, _1));