summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'Swiften/FileTransfer')
-rw-r--r--Swiften/FileTransfer/ByteArrayReadBytestream.h15
-rw-r--r--Swiften/FileTransfer/IBBSendSession.cpp42
-rw-r--r--Swiften/FileTransfer/IBBSendSession.h4
-rw-r--r--Swiften/FileTransfer/ReadBytestream.h10
-rw-r--r--Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp67
5 files changed, 121 insertions, 17 deletions
diff --git a/Swiften/FileTransfer/ByteArrayReadBytestream.h b/Swiften/FileTransfer/ByteArrayReadBytestream.h
index a6945c3..6cbdef0 100644
--- a/Swiften/FileTransfer/ByteArrayReadBytestream.h
+++ b/Swiften/FileTransfer/ByteArrayReadBytestream.h
@@ -8,12 +8,13 @@
#include <vector>
+#include <Swiften/Base/Algorithm.h>
#include <Swiften/FileTransfer/ReadBytestream.h>
namespace Swift {
class ByteArrayReadBytestream : public ReadBytestream {
public:
- ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0) {
+ ByteArrayReadBytestream(const std::vector<unsigned char>& data) : data(data), position(0), dataComplete(true) {
}
virtual std::vector<unsigned char> read(size_t size) {
@@ -29,11 +30,21 @@ namespace Swift {
}
virtual bool isFinished() const {
- return position >= data.size();
+ return position >= data.size() && dataComplete;
+ }
+
+ virtual void setDataComplete(bool b) {
+ dataComplete = b;
+ }
+
+ void addData(const std::vector<unsigned char>& moreData) {
+ append(data, moreData);
+ onDataAvailable();
}
private:
std::vector<unsigned char> data;
size_t position;
+ bool dataComplete;
};
}
diff --git a/Swiften/FileTransfer/IBBSendSession.cpp b/Swiften/FileTransfer/IBBSendSession.cpp
index b925c5f..3a1390c 100644
--- a/Swiften/FileTransfer/IBBSendSession.cpp
+++ b/Swiften/FileTransfer/IBBSendSession.cpp
@@ -14,10 +14,12 @@
namespace Swift {
-IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false) {
+IBBSendSession::IBBSendSession(const std::string& id, const JID& from, const JID& to, boost::shared_ptr<ReadBytestream> bytestream, IQRouter* router) : id(id), from(from), to(to), bytestream(bytestream), router(router), blockSize(4096), sequenceNumber(0), active(false), waitingForData(false) {
+ bytestream->onDataAvailable.connect(boost::bind(&IBBSendSession::handleDataAvailable, this));
}
IBBSendSession::~IBBSendSession() {
+ bytestream->onDataAvailable.disconnect(boost::bind(&IBBSendSession::handleDataAvailable, this));
}
void IBBSendSession::start() {
@@ -37,17 +39,7 @@ void IBBSendSession::stop() {
void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) {
if (!error && active) {
if (!bytestream->isFinished()) {
- try {
- std::vector<unsigned char> data = bytestream->read(blockSize);
- IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router);
- sequenceNumber++;
- request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
- request->send();
- onBytesSent(data.size());
- }
- catch (const BytestreamException&) {
- finish(FileTransferError(FileTransferError::ReadError));
- }
+ sendMoreData();
}
else {
finish(boost::optional<FileTransferError>());
@@ -58,9 +50,35 @@ void IBBSendSession::handleIBBResponse(IBB::ref, ErrorPayload::ref error) {
}
}
+void IBBSendSession::sendMoreData() {
+ try {
+ std::vector<unsigned char> data = bytestream->read(blockSize);
+ if (!data.empty()) {
+ waitingForData = false;
+ IBBRequest::ref request = IBBRequest::create(from, to, IBB::createIBBData(id, sequenceNumber, data), router);
+ sequenceNumber++;
+ request->onResponse.connect(boost::bind(&IBBSendSession::handleIBBResponse, this, _1, _2));
+ request->send();
+ onBytesSent(data.size());
+ }
+ else {
+ waitingForData = true;
+ }
+ }
+ catch (const BytestreamException&) {
+ finish(FileTransferError(FileTransferError::ReadError));
+ }
+}
+
void IBBSendSession::finish(boost::optional<FileTransferError> error) {
active = false;
onFinished(error);
}
+void IBBSendSession::handleDataAvailable() {
+ if (waitingForData) {
+ sendMoreData();
+ }
+}
+
}
diff --git a/Swiften/FileTransfer/IBBSendSession.h b/Swiften/FileTransfer/IBBSendSession.h
index 8e5ace1..abd217b 100644
--- a/Swiften/FileTransfer/IBBSendSession.h
+++ b/Swiften/FileTransfer/IBBSendSession.h
@@ -41,9 +41,12 @@ namespace Swift {
boost::signal<void (boost::optional<FileTransferError>)> onFinished;
boost::signal<void (int)> onBytesSent;
+
private:
void handleIBBResponse(IBB::ref, ErrorPayload::ref);
void finish(boost::optional<FileTransferError>);
+ void sendMoreData();
+ void handleDataAvailable();
private:
std::string id;
@@ -54,5 +57,6 @@ namespace Swift {
int blockSize;
int sequenceNumber;
bool active;
+ bool waitingForData;
};
}
diff --git a/Swiften/FileTransfer/ReadBytestream.h b/Swiften/FileTransfer/ReadBytestream.h
index 9e070f7..0e95f7b 100644
--- a/Swiften/FileTransfer/ReadBytestream.h
+++ b/Swiften/FileTransfer/ReadBytestream.h
@@ -7,7 +7,6 @@
#pragma once
#include <vector>
-#include <cstring>
#include <Swiften/Base/boost_bsignals.h>
@@ -15,10 +14,17 @@ namespace Swift {
class ReadBytestream {
public:
virtual ~ReadBytestream();
+
+ /**
+ * Return an empty vector if no more data is available.
+ * Use onDataAvailable signal for signaling there is data available again.
+ */
virtual std::vector<unsigned char> read(size_t size) = 0;
+
virtual bool isFinished() const = 0;
public:
- boost::signal<void (std::vector<unsigned char>)> onRead;
+ boost::signal<void ()> onDataAvailable;
+ boost::signal<void (const std::vector<unsigned char>&)> onRead;
};
}
diff --git a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
index c88635f..d12f99e 100644
--- a/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
+++ b/Swiften/FileTransfer/UnitTest/IBBSendSessionTest.cpp
@@ -26,6 +26,11 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testErrorResponseFinishesWithError);
CPPUNIT_TEST(testStopDuringSessionCloses);
CPPUNIT_TEST(testStopAfterFinishedDoesNotClose);
+ CPPUNIT_TEST(testDataStreamPauseStopsSendingData);
+ CPPUNIT_TEST(testDataStreamResumeAfterPauseSendsData);
+ CPPUNIT_TEST(testDataStreamResumeBeforePauseDoesNotSendData);
+ CPPUNIT_TEST(testDataStreamResumeAfterResumeDoesNotSendData);
+
CPPUNIT_TEST_SUITE_END();
public:
@@ -33,6 +38,7 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
stanzaChannel = new DummyStanzaChannel();
iqRouter = new IQRouter(stanzaChannel);
bytestream = boost::shared_ptr<ByteArrayReadBytestream>(new ByteArrayReadBytestream(createByteArray("abcdefg")));
+ finished = false;
}
void tearDown() {
@@ -136,7 +142,66 @@ class IBBSendSessionTest : public CppUnit::TestFixture {
CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size()));
}
-
+
+ void testDataStreamPauseStopsSendingData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ CPPUNIT_ASSERT(!finished);
+ CPPUNIT_ASSERT(!error);
+
+ CPPUNIT_ASSERT_EQUAL(4, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
+ void testDataStreamResumeAfterPauseSendsData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ bytestream->addData(createByteArray("xyz"));
+
+ CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
+ void testDataStreamResumeBeforePauseDoesNotSendData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ bytestream->addData(createByteArray("xyz"));
+
+ CPPUNIT_ASSERT_EQUAL(2, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
+ void testDataStreamResumeAfterResumeDoesNotSendData() {
+ boost::shared_ptr<IBBSendSession> testling = createSession("foo@bar.com/baz");
+ bytestream->setDataComplete(false);
+ testling->setBlockSize(3);
+ testling->start();
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+ stanzaChannel->onIQReceived(createIBBResult());
+
+ bytestream->addData(createByteArray("xyz"));
+ bytestream->addData(createByteArray("xuv"));
+
+ CPPUNIT_ASSERT_EQUAL(5, static_cast<int>(stanzaChannel->sentStanzas.size()));
+ }
+
private:
IQ::ref createIBBResult() {
return IQ::createResult(JID("baz@fum.com/dum"), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getTo(), stanzaChannel->sentStanzas[stanzaChannel->sentStanzas.size()-1]->getID(), boost::shared_ptr<IBB>());