diff options
authorAlex Clayton <>2016-02-15 17:24:49 (GMT)
committerAlex Clayton <>2016-02-29 10:39:49 (GMT)
commit3bfe54c141dd3fa20e391312a0a84c75731e2b2a (patch)
tree4de10e954ff540fa6e1b70c8735ac5da14b19743 /src/com/isode/stroke/network/
parent2ebf488dfee7156fbbe0b3d3eccebe13d86a8634 (diff)
Add Network Bosh Classes
Add the missing Bosh classes to the network packages (BoshConnection and BoshConnectionPool), plus tests for the classes and any other classes required by the new classes. Test-information: Units tests all pass ok. Change-Id: I5c2e05bae9e678ac10d2601c7fdbdccd68d66b71
Diffstat (limited to 'src/com/isode/stroke/network/')
1 files changed, 417 insertions, 0 deletions
diff --git a/src/com/isode/stroke/network/ b/src/com/isode/stroke/network/
new file mode 100644
index 0000000..9eefb83
--- /dev/null
+++ b/src/com/isode/stroke/network/
@@ -0,0 +1,417 @@
+/* Copyright (c) 2016, Isode Limited, London, England.
+ * All rights reserved.
+ *
+ * Acquisition and use of this software and related materials for any
+ * purpose requires a written license agreement from Isode Limited,
+ * or a written license from an organisation licensed by Isode Limited
+ * to grant such a license.
+ *
+ */
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+import com.isode.stroke.base.SafeByteArray;
+import com.isode.stroke.base.SafeString;
+import com.isode.stroke.base.URL;
+import com.isode.stroke.eventloop.EventLoop;
+import com.isode.stroke.parser.XMLParserFactory;
+import com.isode.stroke.signals.Signal;
+import com.isode.stroke.signals.Signal1;
+import com.isode.stroke.signals.SignalConnection;
+import com.isode.stroke.signals.Slot1;
+import com.isode.stroke.signals.Slot2;
+import com.isode.stroke.tls.Certificate;
+import com.isode.stroke.tls.CertificateVerificationError;
+import com.isode.stroke.tls.CertificateWithKey;
+import com.isode.stroke.tls.TLSContextFactory;
+import com.isode.stroke.tls.TLSOptions;
+public class BOSHConnectionPool {
+ private final URL boshURL_;
+ private ConnectionFactory connectionFactory_;
+ private final XMLParserFactory xmlParserFactory_;
+ private final TimerFactory timerFactory_;
+ private final List<BOSHConnection> connections_ = new ArrayList<BOSHConnection>();
+ private final Map<BOSHConnection, Set<SignalConnection>> connectionsSignalConnections_
+ = new HashMap<BOSHConnection, Set<SignalConnection>>();
+ private String sid_ = "";
+ private long rid_;
+ private final List<SafeByteArray> dataQueue_ = new ArrayList<SafeByteArray>();
+ private boolean pendingTerminate_;
+ private String to_;
+ private int requestLimit_;
+ private int restartCount_;
+ private boolean pendingRestart_;
+ private List<ConnectionFactory> myConnectionFactories_;
+ private final CachingDomainNameResolver resolver_;
+ private CertificateWithKey clientCertificate_;
+ private TLSContextFactory tlsContextFactory_;
+ private TLSOptions tlsOptions_;
+ private final List<Certificate> pinnedCertificateChain_ = new ArrayList<Certificate>();
+ private CertificateVerificationError lastVerificationError_;
+ public final Signal1<BOSHError> onSessionTerminated = new Signal1<BOSHError>();
+ public final Signal onSessionStarted = new Signal();
+ public final Signal1<SafeByteArray> onXMPPDataRead = new Signal1<SafeByteArray>();
+ public final Signal1<SafeByteArray> onBOSHDataRead = new Signal1<SafeByteArray>();
+ public final Signal1<SafeByteArray> onBOSHDataWritten = new Signal1<SafeByteArray>();
+ private final Logger logger = Logger.getLogger(this.getClass().getName());
+ public BOSHConnectionPool(URL boshURL,DomainNameResolver resolver,
+ ConnectionFactory connectionFactory, XMLParserFactory parserFactory,
+ TLSContextFactory tlsFactory, TimerFactory timerFactory, EventLoop eventLoop,
+ String to,long initialRID,URL boshHTTPConnectProxyURL,
+ SafeString boshHTTPConnectProxyAuthID,SafeString boshHTTPConnectProxyAuthPassword,
+ TLSOptions tlsOptions) {
+ this(boshURL, resolver, connectionFactory, parserFactory, tlsFactory, timerFactory,
+ eventLoop, to, initialRID, boshHTTPConnectProxyURL, boshHTTPConnectProxyAuthID,
+ boshHTTPConnectProxyAuthPassword, tlsOptions, null);
+ }
+ public BOSHConnectionPool(URL boshURL,DomainNameResolver realResolver,
+ ConnectionFactory connectionFactory, XMLParserFactory parserFactory,
+ TLSContextFactory tlsFactory, TimerFactory timerFactory, EventLoop eventLoop,
+ String to,long initialRID,URL boshHTTPConnectProxyURL,
+ SafeString boshHTTPConnectProxyAuthID,SafeString boshHTTPConnectProxyAuthPassword,
+ TLSOptions tlsOptions,HTTPTrafficFilter trafficFilter) {
+ boshURL_ = boshURL;
+ connectionFactory_ = connectionFactory;
+ xmlParserFactory_ = parserFactory;
+ timerFactory_ = timerFactory;
+ rid_ = initialRID;
+ pendingTerminate_ = false;
+ to_ = to;
+ requestLimit_ = 2;
+ restartCount_ = 0;
+ pendingRestart_ = false;
+ tlsContextFactory_ = tlsFactory;
+ tlsOptions_ = tlsOptions;
+ if (!boshHTTPConnectProxyURL.isEmpty()) {
+ this.connectionFactory_ =
+ new HTTPConnectProxiedConnectionFactory(realResolver, connectionFactory,
+ timerFactory, boshHTTPConnectProxyURL.getHost(),
+ URL.getPortOrDefaultPort(boshHTTPConnectProxyURL),
+ boshHTTPConnectProxyAuthID.getData(),
+ boshHTTPConnectProxyAuthPassword.getData(), trafficFilter);
+ }
+ resolver_ = new CachingDomainNameResolver(realResolver, eventLoop);
+ }
+ public void open() {
+ createConnection();
+ }
+ public void write(SafeByteArray data) {
+ dataQueue_.add(data);
+ tryToSendQueuedData();
+ }
+ public void writeFooter() {
+ pendingTerminate_ = true;
+ tryToSendQueuedData();
+ }
+ public void close() {
+ if (!sid_.isEmpty()) {
+ writeFooter();
+ }
+ else {
+ pendingTerminate_ = true;
+ List<BOSHConnection> connectionCopies = new ArrayList<BOSHConnection>(connections_);
+ for(BOSHConnection connection : connectionCopies) {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+ }
+ public void restartStream() {
+ BOSHConnection connection = getSuitableConnection();
+ if (connection != null) {
+ pendingRestart_ = false;
+ rid_++;
+ connection.setRID(rid_);
+ connection.restartStream();
+ restartCount_++;
+ }
+ else {
+ pendingRestart_ = true;
+ }
+ }
+ public void setTLSCertificate(CertificateWithKey certWithKey) {
+ clientCertificate_ = certWithKey;
+ }
+ public boolean isTLSEncrypted() {
+ return !pinnedCertificateChain_.isEmpty();
+ }
+ public Certificate getPeerCertificate() {
+ Certificate peerCertificate = null;
+ if (!pinnedCertificateChain_.isEmpty()) {
+ peerCertificate = pinnedCertificateChain_.get(0);
+ }
+ return peerCertificate;
+ }
+ public List<Certificate> getPeerCertificateChain() {
+ return new ArrayList<Certificate>(pinnedCertificateChain_);
+ }
+ public CertificateVerificationError getPeerCertificateVerificationError() {
+ return lastVerificationError_;
+ }
+ private void handleDataRead(SafeByteArray data) {
+ onXMPPDataRead.emit(data);
+ tryToSendQueuedData(); // Will rebalance the connections
+ }
+ private void handleSessionStarted(String sid, int requests) {
+ sid_ = sid;
+ requestLimit_ = requests;
+ onSessionStarted.emit();
+ }
+ private void handleBOSHDataRead(SafeByteArray data) {
+ onBOSHDataRead.emit(data);
+ }
+ private void handleBOSHDataWritten(SafeByteArray data) {
+ onBOSHDataWritten.emit(data);
+ }
+ private void handleSessionTerminated(BOSHError error) {
+ onSessionTerminated.emit(error);
+ }
+ private void handleConnectFinished(boolean error, BOSHConnection connection) {
+ if (error) {
+ onSessionTerminated.emit(new BOSHError(BOSHError.Type.UndefinedCondition));
+ }
+ else {
+ if ((connection.getPeerCertificate() != null) && pinnedCertificateChain_.isEmpty()) {
+ pinnedCertificateChain_.clear();
+ pinnedCertificateChain_.addAll(connection.getPeerCertificateChain());
+ }
+ if (!pinnedCertificateChain_.isEmpty()) {
+ lastVerificationError_ = connection.getPeerCertificateVerficationError();
+ }
+ if (sid_.isEmpty()) {
+ connection.startStream(to_, rid_);
+ }
+ if (pendingRestart_) {
+ restartStream();
+ }
+ tryToSendQueuedData();
+ }
+ }
+ private void handleConnectionDisconnected(boolean error, BOSHConnection connection) {
+ destroyConnection(connection);
+ if (pendingTerminate_ && sid_.isEmpty() && connections_.isEmpty()) {
+ handleSessionTerminated(null);
+ }
+ else {
+ /* We might have just freed up a connection slot to send with */
+ tryToSendQueuedData();
+ }
+ }
+ private void handleHTTPError(String errorCode) {
+ handleSessionTerminated(new BOSHError(BOSHError.Type.UndefinedCondition));
+ }
+ private BOSHConnection createConnection() {
+ Connector connector = Connector.create(boshURL_.getHost(),
+ URL.getPortOrDefaultPort(boshURL_), null, resolver_,
+ connectionFactory_, timerFactory_);
+ final BOSHConnection connection = BOSHConnection.create(boshURL_, connector,
+ xmlParserFactory_, tlsContextFactory_, tlsOptions_);
+ Set<SignalConnection> signalConnections = new HashSet<SignalConnection>();
+ signalConnections.add(connection.onXMPPDataRead.connect(new Slot1<SafeByteArray>() {
+ @Override
+ public void call(SafeByteArray data) {
+ handleDataRead(data);
+ }
+ }));
+ signalConnections.add(connection.onSessionStarted.connect(new Slot2<String, Integer>() {
+ @Override
+ public void call(String sid, Integer requests) {
+ handleSessionStarted(sid, requests.intValue());
+ }
+ }));
+ signalConnections.add(connection.onBOSHDataRead.connect(new Slot1<SafeByteArray>() {
+ @Override
+ public void call(SafeByteArray data) {
+ handleBOSHDataRead(data);
+ }
+ }));
+ signalConnections.add(connection.onBOSHDataWritten.connect(new Slot1<SafeByteArray>() {
+ @Override
+ public void call(SafeByteArray data) {
+ handleBOSHDataWritten(data);
+ }
+ }));
+ signalConnections.add(connection.onDisconnected.connect(new Slot1<Boolean>() {
+ @Override
+ public void call(Boolean wasError) {
+ handleConnectionDisconnected(wasError.booleanValue(), connection);
+ }
+ }));
+ signalConnections.add(connection.onConnectionFinished.connect(new Slot1<Boolean>() {
+ @Override
+ public void call(Boolean wasError) {
+ handleConnectFinished(wasError.booleanValue(), connection);
+ }
+ }));
+ signalConnections.add(connection.onSessionTerminated.connect(new Slot1<BOSHConnection.BOSHError>() {
+ @Override
+ public void call(BOSHError error) {
+ handleSessionTerminated(error);
+ }
+ }));
+ signalConnections.add(connection.onHTTPError.connect(new Slot1<String>() {
+ @Override
+ public void call(String httpErrorCode) {
+ handleHTTPError(httpErrorCode);
+ }
+ }));
+ if ("https".equals(boshURL_.getScheme())) {
+ boolean success = connection.setClientCertificate(clientCertificate_);
+ logger.fine("setClientCertificate, success: " + success + "\n");
+ }
+ connection.connect();
+ connections_.add(connection);
+ connectionsSignalConnections_.put(connection, signalConnections);
+ return connection;
+ }
+ private void destroyConnection(BOSHConnection connection) {
+ while (connections_.remove(connection)) {
+ // Loop will run till all instances of connection are removed
+ }
+ Set<SignalConnection> signalConnections = connectionsSignalConnections_.remove(connection);
+ if (signalConnections != null) {
+ for (SignalConnection signalConnection : signalConnections) {
+ if (signalConnection != null) {
+ signalConnection.disconnect();
+ }
+ }
+ signalConnections.clear();
+ }
+ }
+ private void tryToSendQueuedData() {
+ if (sid_.isEmpty()) {
+ // If we've not got as far as stream start yet, pend
+ return;
+ }
+ BOSHConnection suitableConnection = getSuitableConnection();
+ boolean toSend = !dataQueue_.isEmpty();
+ if (suitableConnection != null) {
+ if (toSend) {
+ rid_++;
+ suitableConnection.setRID(rid_);
+ SafeByteArray data = new SafeByteArray();
+ for(SafeByteArray datum : dataQueue_) {
+ data.append(datum);
+ }
+ suitableConnection.write(data);
+ dataQueue_.clear();
+ }
+ else if (pendingTerminate_) {
+ rid_++;
+ suitableConnection.setRID(rid_);
+ suitableConnection.terminateStream();
+ sid_ = "";
+ close();
+ }
+ }
+ if (!pendingTerminate_) {
+ // Ensure there's always a session waiting to read data for us
+ boolean pending = false;
+ for(BOSHConnection connection : connections_) {
+ if (connection != null && !connection.isReadyToSend()) {
+ pending = true;
+ }
+ }
+ if (!pending) {
+ if (restartCount_ >= 1) {
+ // Don't open a second connection until we've restarted the stream twice - i.e. we've authed and resource bound
+ if (suitableConnection != null) {
+ rid_++;
+ suitableConnection.setRID(rid_);
+ suitableConnection.write(new SafeByteArray());
+ }
+ else {
+ // My thought process I went through when writing this, to aid anyone else confused why this can happen...
+ //
+ // What to do here? I think this isn't possible.
+ // If you didn't have two connections, suitable would have made one.
+ // If you have two connections and neither is suitable, pending would be true.
+ // If you have a non-pending connection, it's suitable.
+ //
+ // If I decide to do something here, remove assert above.
+ //
+ // Ah! Yes, because there's a period between creating the connection and it being connected. */
+ }
+ }
+ }
+ }
+ }
+ private BOSHConnection getSuitableConnection() {
+ BOSHConnection suitableConnection = null;
+ for(BOSHConnection connection : connections_) {
+ if (connection.isReadyToSend()) {
+ suitableConnection = connection;
+ break;
+ }
+ }
+ if (suitableConnection == null && (connections_.size() < requestLimit_)) {
+ // This is not a suitable connection because it won't have yet connected and added TLS if needed.
+ BOSHConnection newConnection = createConnection();
+ newConnection.setSID(sid_);
+ }
+ assert(connections_.size() <= requestLimit_);
+ assert((suitableConnection == null) || suitableConnection.isReadyToSend());
+ return suitableConnection;
+ }