summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/isode/stroke/network')
-rw-r--r--src/com/isode/stroke/network/BOSHConnection.java527
-rw-r--r--src/com/isode/stroke/network/BOSHConnectionPool.java417
2 files changed, 944 insertions, 0 deletions
diff --git a/src/com/isode/stroke/network/BOSHConnection.java b/src/com/isode/stroke/network/BOSHConnection.java
new file mode 100644
index 0000000..43b990d
--- /dev/null
+++ b/src/com/isode/stroke/network/BOSHConnection.java
@@ -0,0 +1,527 @@
+/* Copyright (c) 2016, Isode Limited, London, England.
+ * All rights reserved.
+ *
+ * Acquisition and use of this software and related materials for any
+ * purpose requires a written license agreement from Isode Limited,
+ * or a written license from an organisation licensed by Isode Limited
+ * to grant such a license.
+ *
+ */
+package com.isode.stroke.network;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.isode.stroke.base.ByteArray;
+import com.isode.stroke.base.Error;
+import com.isode.stroke.base.SafeByteArray;
+import com.isode.stroke.base.URL;
+import com.isode.stroke.network.BOSHConnection.BOSHError.Type;
+import com.isode.stroke.parser.BOSHBodyExtractor;
+import com.isode.stroke.parser.BOSHBodyExtractor.BOSHBody;
+import com.isode.stroke.parser.XMLParserFactory;
+import com.isode.stroke.session.SessionStream.SessionStreamError;
+import com.isode.stroke.signals.Signal1;
+import com.isode.stroke.signals.Signal2;
+import com.isode.stroke.signals.SignalConnection;
+import com.isode.stroke.signals.Slot;
+import com.isode.stroke.signals.Slot1;
+import com.isode.stroke.signals.Slot2;
+import com.isode.stroke.streamstack.DummyStreamLayer;
+import com.isode.stroke.streamstack.HighLayer;
+import com.isode.stroke.streamstack.TLSLayer;
+import com.isode.stroke.tls.Certificate;
+import com.isode.stroke.tls.CertificateVerificationError;
+import com.isode.stroke.tls.CertificateWithKey;
+import com.isode.stroke.tls.TLSContextFactory;
+import com.isode.stroke.tls.TLSError;
+import com.isode.stroke.tls.TLSOptions;
+
+public class BOSHConnection {
+
+ private final URL boshURL_;
+ private Connector connector_;
+ private final XMLParserFactory parserFactory_;
+ private Connection connection_;
+ private final HighLayer dummyLayer_;
+ private final TLSLayer tlsLayer_;
+ private String sid_ = "";
+ private boolean waitingForStartResponse_ = false;
+ private long rid_ = 0;
+ private final SafeByteArray buffer_ = new SafeByteArray();
+ private boolean pending_ = false;
+ private boolean connectionReady_ = false;
+
+ public final Signal1<Boolean> onConnectionFinished = new Signal1<Boolean>();
+ public final Signal1<Boolean> onDisconnected = new Signal1<Boolean>();
+ public final Signal1<BOSHError> onSessionTerminated = new Signal1<BOSHError>();
+ public final Signal2<String, Integer> onSessionStarted = new Signal2<String, Integer>();
+ public final Signal1<SafeByteArray> onXMPPDataRead = new Signal1<SafeByteArray>();
+ public final Signal1<SafeByteArray> onBOSHDataRead = new Signal1<SafeByteArray>();
+ public final Signal1<SafeByteArray> onBOSHDataWritten = new Signal1<SafeByteArray>();
+ public final Signal1<String> onHTTPError = new Signal1<String>();
+
+ private final Logger logger = Logger.getLogger(this.getClass().getName());
+ private SignalConnection onConnectFinishedConnection;
+
+ public static class BOSHError extends com.isode.stroke.session.SessionStream.SessionStreamError {
+
+ public enum Type {
+ BadRequest, HostGone, HostUnknown, ImproperAddressing,
+ InternalServerError, ItemNotFound, OtherRequest, PolicyViolation,
+ RemoteConnectionFailed,RemoteStreamError,SeeOtherURI,SystemShutdown,
+ UndefinedCondition,NoError;
+ }
+
+ public BOSHError(Type type) {
+ super(SessionStreamError.Type.ConnectionReadError);
+ type_ = type;
+ }
+
+ public Type getType() {
+ return type_;
+ }
+
+ private final Type type_;
+
+ }
+
+ public static class Pair<T1,T2> {
+ public final T1 first;
+ public final T2 second;
+
+ public Pair(T1 first,T2 second) {
+ this.first = first;
+ this.second = second;
+ }
+ }
+
+ private BOSHConnection(URL url,Connector connector,XMLParserFactory parserFactory,
+ TLSContextFactory tlsContextFactory,TLSOptions tlsOptions) {
+ boshURL_ = url;
+ connector_ = connector;
+ parserFactory_ = parserFactory;
+ if ("https".equals(boshURL_.getScheme())) {
+ tlsLayer_ = new TLSLayer(tlsContextFactory, tlsOptions);
+ dummyLayer_ = new DummyStreamLayer(tlsLayer_);
+ }
+ else {
+ tlsLayer_ = null;
+ dummyLayer_ = null;
+ }
+ }
+
+ public static BOSHConnection create(URL url,Connector connector,
+ XMLParserFactory parserFactory,TLSContextFactory tlsContextFactory,TLSOptions tlsOptions) {
+ return new BOSHConnection(url, connector, parserFactory, tlsContextFactory, tlsOptions);
+ }
+
+ public void connect() {
+ onConnectFinishedConnection = connector_.onConnectFinished.connect(new Slot2<Connection, Error>() {
+
+ @Override
+ public void call(Connection connection, Error error) {
+ handleConnectFinished(connection);
+ }
+
+ });
+ connector_.start();
+ }
+
+ public void disconnect() {
+ if (connection_ != null) {
+ connection_.disconnect();
+ sid_ = "";
+ }
+ else {
+ handleDisconnected(null);
+ }
+ }
+
+ public void write(SafeByteArray data) {
+ write(data,false,false);
+ }
+
+ private void write(SafeByteArray data, boolean streamRestart, boolean terminate) {
+ assert(connectionReady_);
+ assert(!sid_.isEmpty());
+
+ SafeByteArray safeHeader =
+ createHTTPRequest(data, streamRestart, terminate, rid_, sid_, boshURL_).first;
+
+ onBOSHDataWritten.emit(safeHeader);
+ writeData(safeHeader);
+ pending_ = true;
+
+ String logMessage = "write data: " + safeHeader.toString() + "\n";
+ logger.log(Level.FINE,logMessage);
+ }
+
+ public String getSID() {
+ return sid_;
+ }
+
+ public void setRID(long rid) {
+ rid_ = rid;
+ }
+
+ public void setSID(String sid) {
+ sid_ = sid;
+ }
+
+ public void startStream(String to,long rid) {
+ assert(connectionReady_);
+
+ String content = "<body content='text/xml; charset=utf-8'"
+ + " hold='1'"
+ + " to='" + to + "'"
+ + " rid='" + rid + "'"
+ + " ver='1.6'"
+ + " wait='60'"
+ + " xml:lang='en'"
+ + " xmlns:xmpp='urn:xmpp:bosh'"
+ + " xmpp:version='1.0'"
+ + " xmlns='http://jabber.org/protocol/httpbind' />";
+
+ StringBuilder headerBuilder = new StringBuilder("POST " + boshURL_.getPath() + " HTTP/1.1\r\n"
+ + "Host: " + boshURL_.getHost());
+ Integer boshPort = boshURL_.getPort();
+ if (boshPort != null) {
+ headerBuilder.append(":");
+ headerBuilder.append(boshPort);
+ }
+ headerBuilder.append("\r\n");
+ headerBuilder.append("Accept-Encoding: deflate\r\n");
+ headerBuilder.append("Content-Type: text/xml; charset=utf-8\r\n");
+ headerBuilder.append("Content-Length: ");
+ headerBuilder.append(content.length());
+ headerBuilder.append("\r\n\r\n");
+ headerBuilder.append(content);
+
+ waitingForStartResponse_ = true;
+ SafeByteArray safeHeader = new SafeByteArray(headerBuilder.toString());
+ onBOSHDataWritten.emit(safeHeader);
+ writeData(safeHeader);
+ logger.fine("write stream header: "+safeHeader.toString()+"\n");
+ }
+
+ public void terminateStream() {
+ write(new SafeByteArray(),false,true);
+ }
+
+ public boolean isReadyToSend() {
+ // Without pipelining you need to not send more without first receiving the response
+ // With pipelining you can. Assuming we can't, here
+ return connectionReady_ && !pending_ && !waitingForStartResponse_ && !sid_.isEmpty();
+ }
+
+ public void restartStream() {
+ write(new SafeByteArray(),true,false);
+ }
+
+ public boolean setClientCertificate(CertificateWithKey cert) {
+ if (tlsLayer_ != null) {
+ logger.fine("set client certificate\n");
+ return tlsLayer_.setClientCertificate(cert);
+ }
+ else {
+ return false;
+ }
+ }
+
+ public Certificate getPeerCertificate() {
+ if (tlsLayer_ != null) {
+ return tlsLayer_.getPeerCertificate();
+ }
+ return null;
+ }
+
+ public CertificateVerificationError getPeerCertificateVerficationError() {
+ if (tlsLayer_ != null) {
+ return tlsLayer_.getPeerCertificateVerificationError();
+ }
+ return null;
+ }
+
+ public List<Certificate> getPeerCertificateChain() {
+ if (tlsLayer_ != null) {
+ return tlsLayer_.getPeerCertificateChain();
+ }
+ return new ArrayList<Certificate>();
+ }
+
+ protected static Pair<SafeByteArray,Integer> createHTTPRequest(SafeByteArray data,
+ boolean streamRestart,boolean terminate,long rid,String sid,URL boshURL) {
+ int size = 0;
+ StringBuilder contentBuilder = new StringBuilder();
+ SafeByteArray contentTail = new SafeByteArray("</body>");
+ StringBuilder headerBuilder = new StringBuilder();
+
+ contentBuilder.append("<body rid='");
+ contentBuilder.append(rid);
+ contentBuilder.append("' sid='");
+ contentBuilder.append(sid);
+ contentBuilder.append("'");
+ if (streamRestart) {
+ contentBuilder.append(" xmpp:restart='true' xmlns:xmpp='urn:xmpp:xbosh'");
+ }
+ if (terminate) {
+ contentBuilder.append(" type='terminate'");
+ }
+ contentBuilder.append(" xmlns='http://jabber.org/protocol/httpbind'>");
+
+ SafeByteArray safeContent = new SafeByteArray(contentBuilder.toString());
+ safeContent.append(data);
+ safeContent.append(contentTail);
+
+ size = safeContent.getSize();
+
+ headerBuilder.append("POST " + boshURL.getPath() + " HTTP/1.1\r\n"
+ + "Host: " + boshURL.getHost());
+ Integer boshPort = boshURL.getPort();
+ if (boshPort != null) {
+ headerBuilder.append(":");
+ headerBuilder.append(boshPort);
+ }
+ headerBuilder.append("\r\n");
+ headerBuilder.append("Accept-Encoding: deflate\r\n");
+ headerBuilder.append("Content-Type: text/xml; charset=utf-8\r\n");
+ headerBuilder.append("Content-Length: ");
+ headerBuilder.append(size);
+ headerBuilder.append("\r\n\r\n");
+
+ SafeByteArray safeHeader = new SafeByteArray(headerBuilder.toString());
+ safeHeader.append(safeContent);
+
+ return new Pair<SafeByteArray, Integer>(safeHeader, size);
+ }
+
+ private void handleConnectFinished(Connection connection) {
+ cancelConnector();
+ connectionReady_ = (connection != null);
+ if (connectionReady_) {
+ connection_ = connection;
+ if (tlsLayer_ != null) {
+ connection_.onDataRead.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleRawDataRead(data);
+ }
+
+ });
+ connection_.onDisconnected.connect(new Slot1<Connection.Error>() {
+
+ @Override
+ public void call(
+ com.isode.stroke.network.Connection.Error error) {
+ handleDisconnected(error);
+ }
+
+ });
+
+ tlsLayer_.getContext().onDataForNetwork.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleTLSNetworkDataWriteRequest(data);
+ }
+
+ });
+ tlsLayer_.getContext().onDataForApplication.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleTLSApplicationDataRead(data);
+ }
+
+ });
+
+ tlsLayer_.onConnected.connect(new Slot() {
+
+ @Override
+ public void call() {
+ handleTLSConnected();
+ }
+
+ });
+
+ tlsLayer_.onError.connect(new Slot1<TLSError>() {
+
+ @Override
+ public void call(TLSError error) {
+ handleTLSError(error);
+ }
+
+ });
+ tlsLayer_.connect();
+ }
+ else {
+ connection_.onDataRead.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleDataRead(data);
+ }
+
+ });
+ connection_.onDisconnected.connect(new Slot1<Connection.Error>() {
+
+ @Override
+ public void call(Connection.Error error) {
+ handleDisconnected(error);
+ }
+
+ });
+ }
+
+ if (!connectionReady_ || tlsLayer_ == null) {
+ onConnectionFinished.emit(!connectionReady_);
+ }
+ }
+ }
+
+ private void handleDataRead(SafeByteArray data) {
+ onBOSHDataRead.emit(data);
+ buffer_.append(data);
+ String response = buffer_.toString();
+ if (!response.contains("\r\n\r\n")) {
+ onBOSHDataRead.emit(new SafeByteArray("[[Previous read incomplete, pending]]"));
+ return;
+ }
+ int httpCodeIndex = response.indexOf(" ")+1;
+ String httpCode = response.substring(httpCodeIndex,httpCodeIndex+3);
+ if (!"200".equals(httpCode)) {
+ onHTTPError.emit(httpCode);
+ return;
+ }
+ ByteArray boshData = new ByteArray(response.substring(response.indexOf("\r\n\r\n")));
+ BOSHBodyExtractor parser = new BOSHBodyExtractor(parserFactory_, boshData);
+ BOSHBody boshBody = parser.getBody();
+ if (boshBody != null) {
+ String typeAttribute = boshBody.getAttributes().getAttribute("type");
+ if ( "terminate".equals(typeAttribute) ) {
+ String conditionAttribute = boshBody.getAttributes().getAttribute("condition");
+ BOSHError.Type errorType = parseTerminationCondition(conditionAttribute);
+ onSessionTerminated.emit(errorType == BOSHError.Type.NoError ? null : new BOSHError(errorType));
+ }
+ buffer_.clear();
+ if (waitingForStartResponse_) {
+ waitingForStartResponse_ = false;
+ sid_ = boshBody.getAttributes().getAttribute("sid");
+ String requestsString = boshBody.getAttributes().getAttribute("requests");
+ Integer requests = Integer.valueOf(2);
+ if (requestsString != null && !requestsString.isEmpty()) {
+ try {
+ requests = Integer.valueOf(requestsString);
+ } catch (NumberFormatException e) {
+ requests = Integer.valueOf(2);
+ }
+ }
+ onSessionStarted.emit(sid_, requests);
+ }
+
+ SafeByteArray payload = new SafeByteArray(boshBody.getContent());
+ /* Say we're good to go again, so don't add anything after here in the method */
+ pending_ = false;
+ onXMPPDataRead.emit(payload);
+ }
+ }
+
+ private void handleDisconnected(Connection.Error error) {
+ cancelConnector();
+ onDisconnected.emit(error != null ? Boolean.TRUE : Boolean.FALSE);
+ sid_ = "";
+ connectionReady_ = false;
+ }
+
+ private Type parseTerminationCondition(String text) {
+ Type condition = Type.UndefinedCondition;
+ if ("bad-request".equals(text)) {
+ condition = Type.BadRequest;
+ }
+ else if ("host-gone".equals(text)) {
+ condition = Type.HostGone;
+ }
+ else if ("host-unknown".equals(text)) {
+ condition = Type.HostUnknown;
+ }
+ else if ("improper-addressing".equals(text)) {
+ condition = Type.ImproperAddressing;
+ }
+ else if ("internal-server-error".equals(text)) {
+ condition = Type.InternalServerError;
+ }
+ else if ("item-not-found".equals(text)) {
+ condition = Type.ItemNotFound;
+ }
+ else if ("other-request".equals(text)) {
+ condition = Type.OtherRequest;
+ }
+ else if ("policy-violation".equals(text)) {
+ condition = Type.PolicyViolation;
+ }
+ else if ("remote-connection-failed".equals(text)) {
+ condition = Type.RemoteConnectionFailed;
+ }
+ else if ("remote-stream-error".equals(text)) {
+ condition = Type.RemoteStreamError;
+ }
+ else if ("see-other-uri".equals(text)) {
+ condition = Type.SeeOtherURI;
+ }
+ else if ("system-shutdown".equals(text)) {
+ condition = Type.SystemShutdown;
+ }
+ else if ("".equals(text)) {
+ condition = Type.NoError;
+ }
+ return condition;
+ }
+
+ private void cancelConnector() {
+ if (connector_ != null) {
+ if (onConnectFinishedConnection != null) {
+ onConnectFinishedConnection.disconnect();
+ }
+ connector_.stop();
+ connector_ = null;
+ }
+ }
+
+ private void handleTLSConnected() {
+ logger.fine("\n");
+ onConnectionFinished.emit(Boolean.FALSE);
+ }
+
+ private void handleTLSApplicationDataRead(SafeByteArray data) {
+ logger.fine("\n");
+ handleDataRead(new SafeByteArray(data));
+ }
+
+ private void handleTLSNetworkDataWriteRequest(SafeByteArray data) {
+ logger.fine("\n");
+ connection_.write(data);
+ }
+
+ private void handleRawDataRead(SafeByteArray data) {
+ logger.fine("\n");
+ tlsLayer_.handleDataRead(data);
+ }
+
+ private void handleTLSError(TLSError error) {
+ // Empty Method
+ }
+
+ private void writeData(SafeByteArray data) {
+ if (tlsLayer_ != null) {
+ tlsLayer_.writeData(data);
+ }
+ else {
+ connection_.write(data);
+ }
+ }
+
+}
diff --git a/src/com/isode/stroke/network/BOSHConnectionPool.java b/src/com/isode/stroke/network/BOSHConnectionPool.java
new file mode 100644
index 0000000..9eefb83
--- /dev/null
+++ b/src/com/isode/stroke/network/BOSHConnectionPool.java
@@ -0,0 +1,417 @@
+/* Copyright (c) 2016, Isode Limited, London, England.
+ * All rights reserved.
+ *
+ * Acquisition and use of this software and related materials for any
+ * purpose requires a written license agreement from Isode Limited,
+ * or a written license from an organisation licensed by Isode Limited
+ * to grant such a license.
+ *
+ */
+package com.isode.stroke.network;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import com.isode.stroke.base.SafeByteArray;
+import com.isode.stroke.base.SafeString;
+import com.isode.stroke.base.URL;
+import com.isode.stroke.eventloop.EventLoop;
+import com.isode.stroke.network.BOSHConnection.BOSHError;
+import com.isode.stroke.parser.XMLParserFactory;
+import com.isode.stroke.signals.Signal;
+import com.isode.stroke.signals.Signal1;
+import com.isode.stroke.signals.SignalConnection;
+import com.isode.stroke.signals.Slot1;
+import com.isode.stroke.signals.Slot2;
+import com.isode.stroke.tls.Certificate;
+import com.isode.stroke.tls.CertificateVerificationError;
+import com.isode.stroke.tls.CertificateWithKey;
+import com.isode.stroke.tls.TLSContextFactory;
+import com.isode.stroke.tls.TLSOptions;
+
+public class BOSHConnectionPool {
+
+ private final URL boshURL_;
+ private ConnectionFactory connectionFactory_;
+ private final XMLParserFactory xmlParserFactory_;
+ private final TimerFactory timerFactory_;
+ private final List<BOSHConnection> connections_ = new ArrayList<BOSHConnection>();
+ private final Map<BOSHConnection, Set<SignalConnection>> connectionsSignalConnections_
+ = new HashMap<BOSHConnection, Set<SignalConnection>>();
+ private String sid_ = "";
+ private long rid_;
+ private final List<SafeByteArray> dataQueue_ = new ArrayList<SafeByteArray>();
+ private boolean pendingTerminate_;
+ private String to_;
+ private int requestLimit_;
+ private int restartCount_;
+ private boolean pendingRestart_;
+ private List<ConnectionFactory> myConnectionFactories_;
+ private final CachingDomainNameResolver resolver_;
+ private CertificateWithKey clientCertificate_;
+ private TLSContextFactory tlsContextFactory_;
+ private TLSOptions tlsOptions_;
+ private final List<Certificate> pinnedCertificateChain_ = new ArrayList<Certificate>();
+ private CertificateVerificationError lastVerificationError_;
+
+ public final Signal1<BOSHError> onSessionTerminated = new Signal1<BOSHError>();
+ public final Signal onSessionStarted = new Signal();
+ public final Signal1<SafeByteArray> onXMPPDataRead = new Signal1<SafeByteArray>();
+ public final Signal1<SafeByteArray> onBOSHDataRead = new Signal1<SafeByteArray>();
+ public final Signal1<SafeByteArray> onBOSHDataWritten = new Signal1<SafeByteArray>();
+
+ private final Logger logger = Logger.getLogger(this.getClass().getName());
+
+ public BOSHConnectionPool(URL boshURL,DomainNameResolver resolver,
+ ConnectionFactory connectionFactory, XMLParserFactory parserFactory,
+ TLSContextFactory tlsFactory, TimerFactory timerFactory, EventLoop eventLoop,
+ String to,long initialRID,URL boshHTTPConnectProxyURL,
+ SafeString boshHTTPConnectProxyAuthID,SafeString boshHTTPConnectProxyAuthPassword,
+ TLSOptions tlsOptions) {
+ this(boshURL, resolver, connectionFactory, parserFactory, tlsFactory, timerFactory,
+ eventLoop, to, initialRID, boshHTTPConnectProxyURL, boshHTTPConnectProxyAuthID,
+ boshHTTPConnectProxyAuthPassword, tlsOptions, null);
+ }
+
+ public BOSHConnectionPool(URL boshURL,DomainNameResolver realResolver,
+ ConnectionFactory connectionFactory, XMLParserFactory parserFactory,
+ TLSContextFactory tlsFactory, TimerFactory timerFactory, EventLoop eventLoop,
+ String to,long initialRID,URL boshHTTPConnectProxyURL,
+ SafeString boshHTTPConnectProxyAuthID,SafeString boshHTTPConnectProxyAuthPassword,
+ TLSOptions tlsOptions,HTTPTrafficFilter trafficFilter) {
+ boshURL_ = boshURL;
+ connectionFactory_ = connectionFactory;
+ xmlParserFactory_ = parserFactory;
+ timerFactory_ = timerFactory;
+ rid_ = initialRID;
+ pendingTerminate_ = false;
+ to_ = to;
+ requestLimit_ = 2;
+ restartCount_ = 0;
+ pendingRestart_ = false;
+ tlsContextFactory_ = tlsFactory;
+ tlsOptions_ = tlsOptions;
+ if (!boshHTTPConnectProxyURL.isEmpty()) {
+ this.connectionFactory_ =
+ new HTTPConnectProxiedConnectionFactory(realResolver, connectionFactory,
+ timerFactory, boshHTTPConnectProxyURL.getHost(),
+ URL.getPortOrDefaultPort(boshHTTPConnectProxyURL),
+ boshHTTPConnectProxyAuthID.getData(),
+ boshHTTPConnectProxyAuthPassword.getData(), trafficFilter);
+ }
+ resolver_ = new CachingDomainNameResolver(realResolver, eventLoop);
+ }
+
+ public void open() {
+ createConnection();
+ }
+
+ public void write(SafeByteArray data) {
+ dataQueue_.add(data);
+ tryToSendQueuedData();
+ }
+
+ public void writeFooter() {
+ pendingTerminate_ = true;
+ tryToSendQueuedData();
+ }
+
+ public void close() {
+ if (!sid_.isEmpty()) {
+ writeFooter();
+ }
+ else {
+ pendingTerminate_ = true;
+ List<BOSHConnection> connectionCopies = new ArrayList<BOSHConnection>(connections_);
+ for(BOSHConnection connection : connectionCopies) {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+ }
+
+ public void restartStream() {
+ BOSHConnection connection = getSuitableConnection();
+ if (connection != null) {
+ pendingRestart_ = false;
+ rid_++;
+ connection.setRID(rid_);
+ connection.restartStream();
+ restartCount_++;
+ }
+ else {
+ pendingRestart_ = true;
+ }
+ }
+
+ public void setTLSCertificate(CertificateWithKey certWithKey) {
+ clientCertificate_ = certWithKey;
+ }
+
+ public boolean isTLSEncrypted() {
+ return !pinnedCertificateChain_.isEmpty();
+ }
+
+ public Certificate getPeerCertificate() {
+ Certificate peerCertificate = null;
+ if (!pinnedCertificateChain_.isEmpty()) {
+ peerCertificate = pinnedCertificateChain_.get(0);
+ }
+ return peerCertificate;
+ }
+
+
+ public List<Certificate> getPeerCertificateChain() {
+ return new ArrayList<Certificate>(pinnedCertificateChain_);
+ }
+
+ public CertificateVerificationError getPeerCertificateVerificationError() {
+ return lastVerificationError_;
+ }
+
+ private void handleDataRead(SafeByteArray data) {
+ onXMPPDataRead.emit(data);
+ tryToSendQueuedData(); // Will rebalance the connections
+ }
+
+ private void handleSessionStarted(String sid, int requests) {
+ sid_ = sid;
+ requestLimit_ = requests;
+ onSessionStarted.emit();
+ }
+
+ private void handleBOSHDataRead(SafeByteArray data) {
+ onBOSHDataRead.emit(data);
+ }
+
+ private void handleBOSHDataWritten(SafeByteArray data) {
+ onBOSHDataWritten.emit(data);
+ }
+
+ private void handleSessionTerminated(BOSHError error) {
+ onSessionTerminated.emit(error);
+ }
+
+ private void handleConnectFinished(boolean error, BOSHConnection connection) {
+ if (error) {
+ onSessionTerminated.emit(new BOSHError(BOSHError.Type.UndefinedCondition));
+ }
+ else {
+ if ((connection.getPeerCertificate() != null) && pinnedCertificateChain_.isEmpty()) {
+ pinnedCertificateChain_.clear();
+ pinnedCertificateChain_.addAll(connection.getPeerCertificateChain());
+ }
+ if (!pinnedCertificateChain_.isEmpty()) {
+ lastVerificationError_ = connection.getPeerCertificateVerficationError();
+ }
+
+ if (sid_.isEmpty()) {
+ connection.startStream(to_, rid_);
+ }
+ if (pendingRestart_) {
+ restartStream();
+ }
+ tryToSendQueuedData();
+ }
+ }
+
+ private void handleConnectionDisconnected(boolean error, BOSHConnection connection) {
+ destroyConnection(connection);
+ if (pendingTerminate_ && sid_.isEmpty() && connections_.isEmpty()) {
+ handleSessionTerminated(null);
+ }
+ else {
+ /* We might have just freed up a connection slot to send with */
+ tryToSendQueuedData();
+ }
+ }
+
+ private void handleHTTPError(String errorCode) {
+ handleSessionTerminated(new BOSHError(BOSHError.Type.UndefinedCondition));
+ }
+
+ private BOSHConnection createConnection() {
+ Connector connector = Connector.create(boshURL_.getHost(),
+ URL.getPortOrDefaultPort(boshURL_), null, resolver_,
+ connectionFactory_, timerFactory_);
+ final BOSHConnection connection = BOSHConnection.create(boshURL_, connector,
+ xmlParserFactory_, tlsContextFactory_, tlsOptions_);
+ Set<SignalConnection> signalConnections = new HashSet<SignalConnection>();
+ signalConnections.add(connection.onXMPPDataRead.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleDataRead(data);
+ }
+
+ }));
+ signalConnections.add(connection.onSessionStarted.connect(new Slot2<String, Integer>() {
+
+ @Override
+ public void call(String sid, Integer requests) {
+ handleSessionStarted(sid, requests.intValue());
+ }
+
+ }));
+ signalConnections.add(connection.onBOSHDataRead.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleBOSHDataRead(data);
+ }
+
+ }));
+ signalConnections.add(connection.onBOSHDataWritten.connect(new Slot1<SafeByteArray>() {
+
+ @Override
+ public void call(SafeByteArray data) {
+ handleBOSHDataWritten(data);
+ }
+
+ }));
+ signalConnections.add(connection.onDisconnected.connect(new Slot1<Boolean>() {
+
+ @Override
+ public void call(Boolean wasError) {
+ handleConnectionDisconnected(wasError.booleanValue(), connection);
+ }
+
+ }));
+ signalConnections.add(connection.onConnectionFinished.connect(new Slot1<Boolean>() {
+
+ @Override
+ public void call(Boolean wasError) {
+ handleConnectFinished(wasError.booleanValue(), connection);
+ }
+
+ }));
+ signalConnections.add(connection.onSessionTerminated.connect(new Slot1<BOSHConnection.BOSHError>() {
+
+ @Override
+ public void call(BOSHError error) {
+ handleSessionTerminated(error);
+ }
+
+ }));
+ signalConnections.add(connection.onHTTPError.connect(new Slot1<String>() {
+
+ @Override
+ public void call(String httpErrorCode) {
+ handleHTTPError(httpErrorCode);
+ }
+
+ }));
+
+ if ("https".equals(boshURL_.getScheme())) {
+ boolean success = connection.setClientCertificate(clientCertificate_);
+ logger.fine("setClientCertificate, success: " + success + "\n");
+ }
+
+ connection.connect();
+ connections_.add(connection);
+ connectionsSignalConnections_.put(connection, signalConnections);
+ return connection;
+
+ }
+
+ private void destroyConnection(BOSHConnection connection) {
+ while (connections_.remove(connection)) {
+ // Loop will run till all instances of connection are removed
+ }
+ Set<SignalConnection> signalConnections = connectionsSignalConnections_.remove(connection);
+ if (signalConnections != null) {
+ for (SignalConnection signalConnection : signalConnections) {
+ if (signalConnection != null) {
+ signalConnection.disconnect();
+ }
+ }
+ signalConnections.clear();
+ }
+ }
+
+ private void tryToSendQueuedData() {
+ if (sid_.isEmpty()) {
+ // If we've not got as far as stream start yet, pend
+ return;
+ }
+
+ BOSHConnection suitableConnection = getSuitableConnection();
+ boolean toSend = !dataQueue_.isEmpty();
+ if (suitableConnection != null) {
+ if (toSend) {
+ rid_++;
+ suitableConnection.setRID(rid_);
+ SafeByteArray data = new SafeByteArray();
+ for(SafeByteArray datum : dataQueue_) {
+ data.append(datum);
+ }
+ suitableConnection.write(data);
+ dataQueue_.clear();
+ }
+ else if (pendingTerminate_) {
+ rid_++;
+ suitableConnection.setRID(rid_);
+ suitableConnection.terminateStream();
+ sid_ = "";
+ close();
+ }
+ }
+ if (!pendingTerminate_) {
+ // Ensure there's always a session waiting to read data for us
+ boolean pending = false;
+ for(BOSHConnection connection : connections_) {
+ if (connection != null && !connection.isReadyToSend()) {
+ pending = true;
+ }
+ }
+ if (!pending) {
+ if (restartCount_ >= 1) {
+ // Don't open a second connection until we've restarted the stream twice - i.e. we've authed and resource bound
+ if (suitableConnection != null) {
+ rid_++;
+ suitableConnection.setRID(rid_);
+ suitableConnection.write(new SafeByteArray());
+ }
+ else {
+ // My thought process I went through when writing this, to aid anyone else confused why this can happen...
+ //
+ // What to do here? I think this isn't possible.
+ // If you didn't have two connections, suitable would have made one.
+ // If you have two connections and neither is suitable, pending would be true.
+ // If you have a non-pending connection, it's suitable.
+ //
+ // If I decide to do something here, remove assert above.
+ //
+ // Ah! Yes, because there's a period between creating the connection and it being connected. */
+ }
+ }
+ }
+ }
+ }
+
+ private BOSHConnection getSuitableConnection() {
+ BOSHConnection suitableConnection = null;
+ for(BOSHConnection connection : connections_) {
+ if (connection.isReadyToSend()) {
+ suitableConnection = connection;
+ break;
+ }
+ }
+
+ if (suitableConnection == null && (connections_.size() < requestLimit_)) {
+ // This is not a suitable connection because it won't have yet connected and added TLS if needed.
+ BOSHConnection newConnection = createConnection();
+ newConnection.setSID(sid_);
+ }
+ assert(connections_.size() <= requestLimit_);
+ assert((suitableConnection == null) || suitableConnection.isReadyToSend());
+ return suitableConnection;
+ }
+
+}