summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'Swiften/Network/BOSHConnectionPool.cpp')
-rw-r--r--Swiften/Network/BOSHConnectionPool.cpp247
1 files changed, 247 insertions, 0 deletions
diff --git a/Swiften/Network/BOSHConnectionPool.cpp b/Swiften/Network/BOSHConnectionPool.cpp
new file mode 100644
index 0000000..6c3ba7e
--- /dev/null
+++ b/Swiften/Network/BOSHConnectionPool.cpp
@@ -0,0 +1,247 @@
+/*
+ * Copyright (c) 2011 Kevin Smith
+ * Licensed under the GNU General Public License v3.
+ * See Documentation/Licenses/GPLv3.txt for more information.
+ */
+#include <Swiften/Network/BOSHConnectionPool.h>
+
+#include <climits>
+
+#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include <Swiften/Base/foreach.h>
+#include <Swiften/Base/SafeString.h>
+#include <Swiften/Network/TLSConnectionFactory.h>
+#include <Swiften/Network/HTTPConnectProxiedConnectionFactory.h>
+
+namespace Swift {
+BOSHConnectionPool::BOSHConnectionPool(boost::shared_ptr<BOSHConnectionFactory> connectionFactory, const std::string& to, long initialRID, const URL& boshHTTPConnectProxyURL, const SafeString& boshHTTPConnectProxyAuthID, const SafeString& boshHTTPConnectProxyAuthPassword)
+ : connectionFactory(connectionFactory),
+ rid(initialRID),
+ pendingTerminate(false),
+ to(to),
+ requestLimit(2),
+ restartCount(0),
+ pendingRestart(false) {
+ tlsConnectionFactory = NULL;
+ if (boshHTTPConnectProxyURL.empty()) {
+ connectProxyFactory = NULL;
+ }
+ else {
+ ConnectionFactory* rawFactory = connectionFactory->getRawConnectionFactory();
+ if (boshHTTPConnectProxyURL.getScheme() == "https") {
+ tlsConnectionFactory = new TLSConnectionFactory(connectionFactory->getTLSContextFactory(), rawFactory);
+ rawFactory = tlsConnectionFactory;
+ }
+ connectProxyFactory = new HTTPConnectProxiedConnectionFactory(rawFactory, HostAddressPort(HostAddress(boshHTTPConnectProxyURL.getHost()), boshHTTPConnectProxyURL.getPort()), boshHTTPConnectProxyAuthID, boshHTTPConnectProxyAuthPassword);
+ }
+ createConnection();
+}
+
+BOSHConnectionPool::~BOSHConnectionPool() {
+ close();
+ delete connectProxyFactory;
+ delete tlsConnectionFactory;
+}
+
+void BOSHConnectionPool::write(const SafeByteArray& data) {
+ dataQueue.push_back(data);
+ tryToSendQueuedData();
+}
+
+void BOSHConnectionPool::handleDataRead(const SafeByteArray& data) {
+ onXMPPDataRead(data);
+ tryToSendQueuedData(); /* Will rebalance the connections */
+}
+
+void BOSHConnectionPool::restartStream() {
+ BOSHConnection::ref connection = getSuitableConnection();
+ if (connection) {
+ pendingRestart = false;
+ rid++;
+ connection->setRID(rid);
+ connection->restartStream();
+ restartCount++;
+ }
+ else {
+ pendingRestart = true;
+ }
+}
+
+void BOSHConnectionPool::writeFooter() {
+ pendingTerminate = true;
+ tryToSendQueuedData();
+}
+
+void BOSHConnectionPool::close() {
+ /* TODO: Send a terminate here. */
+ std::vector<BOSHConnection::ref> connectionCopies = connections;
+ foreach (BOSHConnection::ref connection, connectionCopies) {
+ if (connection) {
+ connection->disconnect();
+ destroyConnection(connection);
+ }
+ }
+}
+
+void BOSHConnectionPool::handleSessionStarted(const std::string& sessionID, size_t requests) {
+ sid = sessionID;
+ requestLimit = requests;
+ onSessionStarted();
+}
+
+void BOSHConnectionPool::handleConnectFinished(bool error, BOSHConnection::ref connection) {
+ if (error) {
+ onSessionTerminated(boost::make_shared<BOSHError>(BOSHError::UndefinedCondition));
+ /*TODO: We can probably manage to not terminate the stream here and use the rid/ack retry
+ * logic to just swallow the error and try again (some number of tries).
+ */
+ }
+ else {
+ if (sid.empty()) {
+ connection->startStream(to, rid);
+ }
+ if (pendingRestart) {
+ restartStream();
+ }
+ tryToSendQueuedData();
+ }
+}
+
+BOSHConnection::ref BOSHConnectionPool::getSuitableConnection() {
+ BOSHConnection::ref suitableConnection;
+ foreach (BOSHConnection::ref connection, connections) {
+ if (connection->isReadyToSend()) {
+ suitableConnection = connection;
+ break;
+ }
+ }
+
+ if (!suitableConnection && connections.size() < requestLimit) {
+ /* This is not a suitable connection because it won't have yet connected and added TLS if needed. */
+ BOSHConnection::ref newConnection = createConnection();
+ newConnection->setSID(sid);
+ }
+ assert(connections.size() <= requestLimit);
+ assert((!suitableConnection) || suitableConnection->isReadyToSend());
+ return suitableConnection;
+}
+
+void BOSHConnectionPool::tryToSendQueuedData() {
+ if (sid.empty()) {
+ /* If we've not got as far as stream start yet, pend */
+ return;
+ }
+
+ BOSHConnection::ref suitableConnection = getSuitableConnection();
+ bool sent = false;
+ bool toSend = !dataQueue.empty();
+ if (suitableConnection) {
+ if (toSend) {
+ rid++;
+ suitableConnection->setRID(rid);
+ SafeByteArray data;
+ foreach (const SafeByteArray& datum, dataQueue) {
+ data.insert(data.end(), datum.begin(), datum.end());
+ }
+ suitableConnection->write(data);
+ sent = true;
+ dataQueue.clear();
+ }
+ else if (pendingTerminate) {
+ rid++;
+ suitableConnection->setRID(rid);
+ suitableConnection->terminateStream();
+ sent = true;
+ onSessionTerminated(boost::shared_ptr<BOSHError>());
+ }
+ }
+ if (!pendingTerminate) {
+ /* Ensure there's always a session waiting to read data for us */
+ bool pending = false;
+ foreach (BOSHConnection::ref connection, connections) {
+ if (connection && !connection->isReadyToSend()) {
+ pending = true;
+ }
+ }
+ if (!pending) {
+ if (restartCount >= 1) {
+ /* Don't open a second connection until we've restarted the stream twice - i.e. we've authed and resource bound.*/
+ if (suitableConnection) {
+ rid++;
+ suitableConnection->setRID(rid);
+ suitableConnection->write(createSafeByteArray(""));
+ }
+ else {
+ /* My thought process I went through when writing this, to aid anyone else confused why this can happen...
+ *
+ * What to do here? I think this isn't possible.
+ If you didn't have two connections, suitable would have made one.
+ If you have two connections and neither is suitable, pending would be true.
+ If you have a non-pending connection, it's suitable.
+
+ If I decide to do something here, remove assert above.
+
+ Ah! Yes, because there's a period between creating the connection and it being connected. */
+ }
+ }
+ }
+ }
+}
+
+void BOSHConnectionPool::handleHTTPError(const std::string& /*errorCode*/) {
+ handleSessionTerminated(boost::make_shared<BOSHError>(BOSHError::UndefinedCondition));
+}
+
+void BOSHConnectionPool::handleConnectionDisconnected(const boost::optional<Connection::Error>& error, BOSHConnection::ref connection) {
+ destroyConnection(connection);
+ if (false && error) {
+ handleSessionTerminated(boost::make_shared<BOSHError>(BOSHError::UndefinedCondition));
+ }
+ else {
+ /* We might have just freed up a connection slot to send with */
+ tryToSendQueuedData();
+ }
+}
+
+boost::shared_ptr<BOSHConnection> BOSHConnectionPool::createConnection() {
+ BOSHConnection::ref connection = boost::dynamic_pointer_cast<BOSHConnection>(connectionFactory->createConnection(connectProxyFactory));
+ connection->onXMPPDataRead.connect(boost::bind(&BOSHConnectionPool::handleDataRead, this, _1));
+ connection->onSessionStarted.connect(boost::bind(&BOSHConnectionPool::handleSessionStarted, this, _1, _2));
+ connection->onBOSHDataRead.connect(boost::bind(&BOSHConnectionPool::handleBOSHDataRead, this, _1));
+ connection->onBOSHDataWritten.connect(boost::bind(&BOSHConnectionPool::handleBOSHDataWritten, this, _1));
+ connection->onDisconnected.connect(boost::bind(&BOSHConnectionPool::handleConnectionDisconnected, this, _1, connection));
+ connection->onConnectFinished.connect(boost::bind(&BOSHConnectionPool::handleConnectFinished, this, _1, connection));
+ connection->onSessionTerminated.connect(boost::bind(&BOSHConnectionPool::handleSessionTerminated, this, _1));
+ connection->onHTTPError.connect(boost::bind(&BOSHConnectionPool::handleHTTPError, this, _1));
+ connection->connect(HostAddressPort(HostAddress("0.0.0.0"), 0));
+ connections.push_back(connection);
+ return connection;
+}
+
+void BOSHConnectionPool::destroyConnection(boost::shared_ptr<BOSHConnection> connection) {
+ connections.erase(std::remove(connections.begin(), connections.end(), connection), connections.end());
+ connection->onXMPPDataRead.disconnect(boost::bind(&BOSHConnectionPool::handleDataRead, this, _1));
+ connection->onSessionStarted.disconnect(boost::bind(&BOSHConnectionPool::handleSessionStarted, this, _1, _2));
+ connection->onBOSHDataRead.disconnect(boost::bind(&BOSHConnectionPool::handleBOSHDataRead, this, _1));
+ connection->onBOSHDataWritten.disconnect(boost::bind(&BOSHConnectionPool::handleBOSHDataWritten, this, _1));
+ connection->onDisconnected.disconnect(boost::bind(&BOSHConnectionPool::handleConnectionDisconnected, this, _1, connection));
+ connection->onConnectFinished.disconnect(boost::bind(&BOSHConnectionPool::handleConnectFinished, this, _1, connection));
+ connection->onSessionTerminated.disconnect(boost::bind(&BOSHConnectionPool::handleSessionTerminated, this, _1));
+ connection->onHTTPError.disconnect(boost::bind(&BOSHConnectionPool::handleHTTPError, this, _1));
+}
+
+void BOSHConnectionPool::handleSessionTerminated(BOSHError::ref error) {
+ onSessionTerminated(error);
+}
+
+void BOSHConnectionPool::handleBOSHDataRead(const SafeByteArray& data) {
+ onBOSHDataRead(data);
+}
+
+void BOSHConnectionPool::handleBOSHDataWritten(const SafeByteArray& data) {
+ onBOSHDataWritten(data);
+}
+
+}