diff options
17 files changed, 1249 insertions, 26 deletions
diff --git a/src/com/isode/stroke/adhoc/OutgoingAdHocCommandSession.java b/src/com/isode/stroke/adhoc/OutgoingAdHocCommandSession.java index 5bccb31..4369f8e 100644 --- a/src/com/isode/stroke/adhoc/OutgoingAdHocCommandSession.java +++ b/src/com/isode/stroke/adhoc/OutgoingAdHocCommandSession.java @@ -23,6 +23,7 @@ import com.isode.stroke.queries.GenericRequest; import com.isode.stroke.queries.IQRouter; import com.isode.stroke.signals.Signal1; import com.isode.stroke.signals.Slot2; +import com.isode.stroke.signals.SignalConnection; /** * This class maintains the session between the client and the server for an @@ -71,6 +72,7 @@ public class OutgoingAdHocCommandSession { private boolean isMultiStage_; private String sessionID_; private HashMap<Action, ActionState> actionStates_ = new HashMap<Action, ActionState>(); + private SignalConnection connection_; /** * Create an Ad-Hoc command session. The initial command will be sent to the @@ -101,6 +103,13 @@ public class OutgoingAdHocCommandSession { isMultiStage_ = false; } + /** + * This method needs to be called for the object to be eligible for garbage collection. + */ + public void delete() { + connection_.disconnect(); + } + private void handleResponse(Command payload, ErrorPayload error) { if (error != null) { onError.emit(error); @@ -156,7 +165,7 @@ public class OutgoingAdHocCommandSession { public void start() { GenericRequest<Command> commandRequest = new GenericRequest<Command>( IQ.Type.Set, to_, new Command(commandNode_), iqRouter_); - commandRequest.onResponse.connect(new Slot2<Command, ErrorPayload>() { + connection_ = commandRequest.onResponse.connect(new Slot2<Command, ErrorPayload>() { public void call(Command payload, ErrorPayload error) { handleResponse(payload, error); } @@ -206,7 +215,8 @@ public class OutgoingAdHocCommandSession { GenericRequest<Command> commandRequest = new GenericRequest<Command>( IQ.Type.Set, to_, command, iqRouter_); - commandRequest.onResponse.connect(new Slot2<Command, ErrorPayload>() { + connection_.disconnect(); + connection_ = commandRequest.onResponse.connect(new Slot2<Command, ErrorPayload>() { public void call(Command payload, ErrorPayload error) { handleResponse(payload, error); } diff --git a/src/com/isode/stroke/client/CoreClient.java b/src/com/isode/stroke/client/CoreClient.java index 84ea673..efefa1c 100644 --- a/src/com/isode/stroke/client/CoreClient.java +++ b/src/com/isode/stroke/client/CoreClient.java @@ -5,6 +5,7 @@ package com.isode.stroke.client; import com.isode.stroke.base.NotNull; +import com.isode.stroke.base.SafeByteArray; import com.isode.stroke.elements.Message; import com.isode.stroke.elements.Presence; import com.isode.stroke.elements.Stanza; @@ -58,13 +59,13 @@ public class CoreClient { * The user may add a listener to this signal, which will be called when * data are received from the server. Useful for observing protocol exchange. */ - public final Signal1<String> onDataRead = new Signal1<String>(); + public final Signal1<SafeByteArray> onDataRead = new Signal1<SafeByteArray>(); /** * The user may add a listener to this signal, which will be called when * data are sent to the server. Useful for observing protocol exchange. */ - public final Signal1<String> onDataWritten = new Signal1<String>(); + public final Signal1<SafeByteArray> onDataWritten = new Signal1<SafeByteArray>(); /** * Called when a message stanza is received. @@ -249,16 +250,16 @@ public class CoreClient { if (certificate_ != null && !certificate_.isNull()) { sessionStream_.setTLSCertificate(certificate_); } - sessionStreamDataReadConnection_ = sessionStream_.onDataRead.connect(new Slot1<String>() { + sessionStreamDataReadConnection_ = sessionStream_.onDataRead.connect(new Slot1<SafeByteArray>() { - public void call(String p1) { + public void call(SafeByteArray p1) { handleDataRead(p1); } }); - sessionStreamDataWrittenConnection_ = sessionStream_.onDataWritten.connect(new Slot1<String>() { + sessionStreamDataWrittenConnection_ = sessionStream_.onDataWritten.connect(new Slot1<SafeByteArray>() { - public void call(String p1) { + public void call(SafeByteArray p1) { handleDataWritten(p1); } }); @@ -421,11 +422,11 @@ public class CoreClient { session_.sendCredentials(password_); } - private void handleDataRead(final String data) { + private void handleDataRead(final SafeByteArray data) { onDataRead.emit(data); } - private void handleDataWritten(final String data) { + private void handleDataWritten(final SafeByteArray data) { onDataWritten.emit(data); } diff --git a/src/com/isode/stroke/component/Component.java b/src/com/isode/stroke/component/Component.java new file mode 100644 index 0000000..61c2bb2 --- /dev/null +++ b/src/com/isode/stroke/component/Component.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2010-2013 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import com.isode.stroke.component.CoreComponent; +import com.isode.stroke.queries.responders.SoftwareVersionResponder; +import com.isode.stroke.jid.JID; +import com.isode.stroke.network.NetworkFactories; + +/** + * Provides the core functionality for writing XMPP component software. + * + * Besides connecting to an XMPP server, this class also provides interfaces for + * performing most component tasks on the XMPP network. + */ +public class Component extends CoreComponent { + + private SoftwareVersionResponder softwareVersionResponder; + + public Component(JID jid, String secret, NetworkFactories networkFactories) { + super(jid, secret, networkFactories); + softwareVersionResponder = new SoftwareVersionResponder(getIQRouter()); + softwareVersionResponder.start(); + } + + public void delete() { + softwareVersionResponder.stop(); + } + + /** + * Sets the software version of the client. + * + * This will be used to respond to version queries from other entities. + */ + public void setSoftwareVersion(final String name, final String version) { + softwareVersionResponder.setVersion(name, version); + } +}
\ No newline at end of file diff --git a/src/com/isode/stroke/component/ComponentConnector.java b/src/com/isode/stroke/component/ComponentConnector.java new file mode 100644 index 0000000..795fd7c --- /dev/null +++ b/src/com/isode/stroke/component/ComponentConnector.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2010 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import com.isode.stroke.network.Connection; +import com.isode.stroke.network.JavaConnection; +import com.isode.stroke.network.Timer; +import com.isode.stroke.network.HostAddressPort; +import com.isode.stroke.network.DomainNameResolveError; +import com.isode.stroke.network.DomainNameAddressQuery; +import com.isode.stroke.network.DomainNameResolver; +import com.isode.stroke.network.ConnectionFactory; +import com.isode.stroke.network.TimerFactory; +import com.isode.stroke.network.HostAddress; +import com.isode.stroke.signals.Signal1; +import com.isode.stroke.signals.SignalConnection; +import com.isode.stroke.signals.Slot2; +import com.isode.stroke.signals.Slot1; +import com.isode.stroke.signals.Slot; +import java.util.Vector; +import java.util.Collection; + +public class ComponentConnector { + + private String hostname = ""; + private int port; + private DomainNameResolver resolver; + private ConnectionFactory connectionFactory; + private TimerFactory timerFactory; + private int timeoutMilliseconds; + private Timer timer; + private DomainNameAddressQuery addressQuery; + private Vector<HostAddress> addressQueryResults = new Vector<HostAddress>(); + private Connection currentConnection; + private SignalConnection onConnectFinishedConnection; + private SignalConnection onTickConnection; + private SignalConnection onResultConnection; + + public ComponentConnector(final String hostname, int port, DomainNameResolver resolver, ConnectionFactory connectionFactory, TimerFactory timerFactory) { + this.hostname = hostname; + this.port = port; + this.resolver = resolver; + this.connectionFactory = connectionFactory; + this.timerFactory = timerFactory; + this.timeoutMilliseconds = 0; + } + + public static ComponentConnector create(final String hostname, int port, DomainNameResolver resolver, ConnectionFactory connectionFactory, TimerFactory timerFactory) { + return new ComponentConnector(hostname, port, resolver, connectionFactory, timerFactory); + } + + public void setTimeoutMilliseconds(int milliseconds) { + timeoutMilliseconds = milliseconds; + } + + public void start() { + assert(currentConnection == null); + assert(timer == null); + assert(addressQuery == null); + addressQuery = resolver.createAddressQuery(hostname); + onResultConnection = addressQuery.onResult.connect(new Slot2<Collection<HostAddress>, DomainNameResolveError>() { + @Override + public void call(Collection<HostAddress> c1, DomainNameResolveError d1) { + handleAddressQueryResult(c1, d1); + } + }); + if (timeoutMilliseconds > 0) { + timer = timerFactory.createTimer(timeoutMilliseconds); + onTickConnection = timer.onTick.connect(new Slot() { + @Override + public void call() { + handleTimeout(); + } + }); + timer.start(); + } + addressQuery.run(); + } + + public void stop() { + finish((Connection)null); + } + + public final Signal1<Connection> onConnectFinished = new Signal1<Connection>(); + + private void handleAddressQueryResult(final Collection<HostAddress> addresses, DomainNameResolveError error) { + addressQuery = null; + if (error != null || addresses.isEmpty()) { + finish((Connection)null); + } + else { + addressQueryResults.addAll(addresses); + tryNextAddress(); + } + } + + private void tryNextAddress() { + assert(!addressQueryResults.isEmpty()); + HostAddress address = addressQueryResults.remove(0); + tryConnect(new HostAddressPort(address, port)); + } + + private void tryConnect(final HostAddressPort target) { + assert(currentConnection == null); + currentConnection = connectionFactory.createConnection(); + onConnectFinishedConnection = currentConnection.onConnectFinished.connect(new Slot1<Boolean>() { + @Override + public void call(Boolean b) { + handleConnectionConnectFinished(b); + } + }); + currentConnection.connect(target); + } + + private void handleConnectionConnectFinished(boolean error) { + onConnectFinishedConnection.disconnect(); + if (error) { + currentConnection = null; + if (!addressQueryResults.isEmpty()) { + tryNextAddress(); + } + else { + finish((Connection)null); + } + } + else { + finish(currentConnection); + } + } + + private void finish(Connection connection) { + if (timer != null) { + timer.stop(); + onTickConnection.disconnect(); + timer = null; + } + if (addressQuery != null) { + onResultConnection.disconnect(); + addressQuery = null; + } + if (currentConnection != null) { + onConnectFinishedConnection.disconnect(); + currentConnection = null; + } + onConnectFinished.emit(connection); + } + + private void handleTimeout() { + finish((Connection)null); + } +}
\ No newline at end of file diff --git a/src/com/isode/stroke/component/ComponentError.java b/src/com/isode/stroke/component/ComponentError.java new file mode 100644 index 0000000..576e3dc --- /dev/null +++ b/src/com/isode/stroke/component/ComponentError.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2010-2015 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +public class ComponentError { + + public enum Type { + UnknownError, + ConnectionError, + ConnectionReadError, + ConnectionWriteError, + XMLError, + AuthenticationFailedError, + UnexpectedElementError + }; + + private Type type_; + + public ComponentError() { + this(Type.UnknownError); + } + + public ComponentError(Type type) { + this.type_ = type; + } + + public Type getType() { + return type_; + } +}
\ No newline at end of file diff --git a/src/com/isode/stroke/component/ComponentSession.java b/src/com/isode/stroke/component/ComponentSession.java new file mode 100644 index 0000000..40bb391 --- /dev/null +++ b/src/com/isode/stroke/component/ComponentSession.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2010-2014 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import com.isode.stroke.jid.JID; +import com.isode.stroke.elements.Stanza; +import com.isode.stroke.elements.Element; +import com.isode.stroke.elements.ProtocolHeader; +import com.isode.stroke.elements.ComponentHandshake; +import com.isode.stroke.elements.StreamFeatures; +import com.isode.stroke.session.SessionStream; +import com.isode.stroke.crypto.CryptoProvider; +import com.isode.stroke.crypto.JavaCryptoProvider; +import com.isode.stroke.signals.Signal1; +import com.isode.stroke.signals.Signal; +import com.isode.stroke.signals.SignalConnection; +import com.isode.stroke.signals.Slot1; + +public class ComponentSession { + + public enum State { + Initial, + WaitingForStreamStart, + Authenticating, + Initialized, + Finishing, + Finished + }; + + public static class Error implements com.isode.stroke.base.Error { + public enum Type { + AuthenticationFailedError, + UnexpectedElementError + } + public Type type; + public Error(Type type) { + if (type == null) { + throw new IllegalStateException(); + } + this.type = type; + } + }; + + private JID jid = new JID(); + private String secret = ""; + private SessionStream stream; + private CryptoProvider crypto; + private com.isode.stroke.base.Error error; + private State state; + public final Signal onInitialized = new Signal(); + public final Signal1<com.isode.stroke.base.Error> onFinished = new Signal1<com.isode.stroke.base.Error>(); + public final Signal1<Stanza> onStanzaReceived = new Signal1<Stanza>(); + private SignalConnection onStreamStartReceivedConnection; + private SignalConnection onElementReceivedConnection; + private SignalConnection onClosedConnection; + + public static ComponentSession create(final JID jid, final String secret, SessionStream stream, CryptoProvider crypto) { + return new ComponentSession(jid, secret, stream, crypto); + } + + public State getState() { + return state; + } + + public void start() { + onStreamStartReceivedConnection = stream.onStreamStartReceived.connect(new Slot1<ProtocolHeader>() { + @Override + public void call(ProtocolHeader p1) { + handleStreamStart(p1); + } + }); + onElementReceivedConnection = stream.onElementReceived.connect(new Slot1<Element>() { + @Override + public void call(Element e1) { + handleElement(e1); + } + }); + onClosedConnection = stream.onClosed.connect(new Slot1<SessionStream.Error>() { + @Override + public void call(SessionStream.Error e1) { + handleStreamClosed(e1); + } + }); + + assert(State.Initial.equals(state)); + state = State.WaitingForStreamStart; + sendStreamHeader(); + } + + public void finish() { + finishSession((Error.Type)null); + } + + public void sendStanza(Stanza stanza) { + stream.writeElement(stanza); + } + + private ComponentSession(final JID jid, final String secret, SessionStream stream, CryptoProvider crypto) { + this.jid = jid; + this.secret = secret; + this.stream = stream; + this.crypto = crypto; + this.state = State.Initial; + } + + private void finishSession(Error.Type error) { + Error localError = null; + if (error != null) { + localError = new Error(error); + } + finishSession(localError); + } + + private void finishSession(com.isode.stroke.base.Error finishError) { + state = State.Finishing; + error = finishError; + assert(stream.isOpen() == true); + stream.writeFooter(); + stream.close(); + } + + private void sendStreamHeader() { + ProtocolHeader header = new ProtocolHeader(); + header.setTo(jid.toString()); + stream.writeHeader(header); + } + + private void handleElement(Element element) { + if(element instanceof Stanza) { + Stanza stanza = (Stanza)element; + if (State.Initialized.equals(getState())) { + onStanzaReceived.emit(stanza); + } + else { + finishSession(Error.Type.UnexpectedElementError); + } + } + else if (element instanceof ComponentHandshake) { + if (!checkState(State.Authenticating)) { + return; + } + stream.setWhitespacePingEnabled(true); + state = State.Initialized; + onInitialized.emit(); + } + else if (State.Authenticating.equals(getState())) { + if (element instanceof StreamFeatures) { + // M-Link sends stream features, so swallow that. + } + else { + // FIXME: We should actually check the element received + finishSession(Error.Type.AuthenticationFailedError); + } + } + else { + finishSession(Error.Type.UnexpectedElementError); + } + } + + private void handleStreamStart(final ProtocolHeader header) { + checkState(State.WaitingForStreamStart); + state = State.Authenticating; + stream.writeElement(new ComponentHandshake(ComponentHandshakeGenerator.getHandshake(header.getID(), secret, crypto))); + } + + private void handleStreamClosed(SessionStream.Error streamError) { + State oldState = state; + state = State.Finished; + stream.setWhitespacePingEnabled(false); + onStreamStartReceivedConnection.disconnect(); + onElementReceivedConnection.disconnect(); + onClosedConnection.disconnect(); + if (State.Finishing.equals(oldState)) { + onFinished.emit(error); + } + else { + onFinished.emit(streamError); + } + } + + private boolean checkState(State state) { + if (!(this.state.equals(state))) { + finishSession(Error.Type.UnexpectedElementError); + return false; + } + return true; + } +} + + + diff --git a/src/com/isode/stroke/component/ComponentSessionStanzaChannel.java b/src/com/isode/stroke/component/ComponentSessionStanzaChannel.java new file mode 100644 index 0000000..8d97bef --- /dev/null +++ b/src/com/isode/stroke/component/ComponentSessionStanzaChannel.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2010-2015 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import com.isode.stroke.base.IDGenerator; +import com.isode.stroke.component.ComponentSession; +import com.isode.stroke.client.StanzaChannel; +import com.isode.stroke.elements.Message; +import com.isode.stroke.elements.IQ; +import com.isode.stroke.elements.Presence; +import com.isode.stroke.elements.Stanza; +import com.isode.stroke.tls.Certificate; +import com.isode.stroke.signals.SignalConnection; +import com.isode.stroke.signals.Slot; +import com.isode.stroke.signals.Slot1; +import java.util.Vector; + +public class ComponentSessionStanzaChannel extends StanzaChannel { + + private IDGenerator idGenerator; + private ComponentSession session; + private SignalConnection onInitializedConnection; + private SignalConnection onFinishedConnection; + private SignalConnection onStanzaReceivedConnection; + + public void setSession(ComponentSession session) { + assert(this.session == null); + this.session = session; + onInitializedConnection = session.onInitialized.connect(new Slot() { + @Override + public void call() { + handleSessionInitialized(); + } + }); + onFinishedConnection = session.onFinished.connect(new Slot1<com.isode.stroke.base.Error>() { + @Override + public void call(com.isode.stroke.base.Error e1) { + handleSessionFinished(e1); + } + }); + onStanzaReceivedConnection = session.onStanzaReceived.connect(new Slot1<Stanza>() { + @Override + public void call(Stanza s1) { + handleStanza(s1); + } + }); + } + + public void sendIQ(IQ iq) { + send(iq); + } + + public void sendMessage(Message message) { + send(message); + } + + public void sendPresence(Presence presence) { + send(presence); + } + + public boolean getStreamManagementEnabled() { + return false; + } + + public Vector<Certificate> getPeerCertificateChain() { + // TODO: actually implement this method + return (Vector<Certificate>)null; + } + + public boolean isAvailable() { + return (session != null) && (ComponentSession.State.Initialized.equals(session.getState())); + } + + public String getNewIQID() { + return idGenerator.generateID(); + } + + private void send(Stanza stanza) { + if (!isAvailable()) { + System.err.println("Warning: Component: Trying to send a stanza while disconnected.\n"); + return; + } + session.sendStanza(stanza); + } + + private void handleSessionFinished(com.isode.stroke.base.Error error) { + onFinishedConnection.disconnect(); + onStanzaReceivedConnection.disconnect(); + onInitializedConnection.disconnect(); + session = null; + + onAvailableChanged.emit(false); + } + + private void handleStanza(Stanza stanza) { + if(stanza instanceof Message) { + Message message = (Message)(stanza); + onMessageReceived.emit(message); + return; + } + + if(stanza instanceof Presence) { + Presence presence = (Presence)(stanza); + onPresenceReceived.emit(presence); + return; + } + + if(stanza instanceof IQ) { + IQ iq = (IQ)(stanza); + onIQReceived.emit(iq); + return; + } + } + + private void handleSessionInitialized() { + onAvailableChanged.emit(true); + } +}
\ No newline at end of file diff --git a/src/com/isode/stroke/component/ComponentXMLTracer.java b/src/com/isode/stroke/component/ComponentXMLTracer.java new file mode 100644 index 0000000..005a937 --- /dev/null +++ b/src/com/isode/stroke/component/ComponentXMLTracer.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2010 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import com.isode.stroke.component.Component; +import com.isode.stroke.component.CoreComponent; +import com.isode.stroke.base.SafeByteArray; +import com.isode.stroke.signals.Slot1; + +public class ComponentXMLTracer { + + public ComponentXMLTracer(CoreComponent client) { + client.onDataRead.connect(new Slot1<SafeByteArray>() { + @Override + public void call(SafeByteArray b1) { + printData('<', b1); + } + }); + + client.onDataWritten.connect(new Slot1<SafeByteArray>() { + @Override + public void call(SafeByteArray b1) { + printData('>', b1); + } + }); + } + + private static void printData(char direction, final SafeByteArray data) { + printLine(direction); + System.err.println(data); + } + + private static void printLine(char c) { + for (int i = 0; i < 80; ++i) { + System.err.print(c); + } + System.err.println(); + } +}
\ No newline at end of file diff --git a/src/com/isode/stroke/component/CoreComponent.java b/src/com/isode/stroke/component/CoreComponent.java new file mode 100644 index 0000000..63b9cd5 --- /dev/null +++ b/src/com/isode/stroke/component/CoreComponent.java @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2010-2015 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import com.isode.stroke.base.Error; +import com.isode.stroke.component.ComponentConnector; +import com.isode.stroke.component.ComponentSession; +import com.isode.stroke.component.ComponentError; +import com.isode.stroke.component.ComponentSessionStanzaChannel; +import com.isode.stroke.elements.Presence; +import com.isode.stroke.elements.Message; +import com.isode.stroke.elements.StreamType; +import com.isode.stroke.jid.JID; +import com.isode.stroke.parser.payloadparsers.FullPayloadParserFactoryCollection; +import com.isode.stroke.serializer.payloadserializers.FullPayloadSerializerCollection; +import com.isode.stroke.entity.Entity; +import com.isode.stroke.base.SafeByteArray; +import com.isode.stroke.eventloop.EventLoop; +import com.isode.stroke.queries.IQRouter; +import com.isode.stroke.network.NetworkFactories; +import com.isode.stroke.network.Connection; +import com.isode.stroke.session.BasicSessionStream; +import com.isode.stroke.session.SessionStream; +import com.isode.stroke.signals.Signal1; +import com.isode.stroke.signals.Signal; +import com.isode.stroke.signals.SignalConnection; +import com.isode.stroke.signals.Slot1; +import com.isode.stroke.client.StanzaChannel; + +/** + * The central class for communicating with an XMPP server as a component. + * + * This class is responsible for setting up the connection with the XMPP + * server and authenticating the component. + * + * This class can be used directly in your application, although the Component + * subclass provides more functionality and interfaces, and is better suited + * for most needs. + */ +public class CoreComponent extends Entity { + + private NetworkFactories networkFactories; + private JID jid_; + private String secret_; + private ComponentSessionStanzaChannel stanzaChannel_; + private IQRouter iqRouter_; + private ComponentConnector connector_; + private Connection connection_; + private BasicSessionStream sessionStream_; + private ComponentSession session_; + private boolean disconnectRequested_; + private SignalConnection onMessageReceivedConnection; + private SignalConnection onPresenceReceivedConnection; + private SignalConnection onAvailableChangedConnection; + private SignalConnection onConnectFinishedConnection; + private SignalConnection onDataReadConnection; + private SignalConnection onDataWrittenConnection; + private SignalConnection onFinishedConnection; + + public final Signal1<ComponentError> onError = new Signal1<ComponentError>(); + public final Signal onConnected = new Signal(); + public final Signal1<SafeByteArray> onDataRead = new Signal1<SafeByteArray>(); + public final Signal1<SafeByteArray> onDataWritten = new Signal1<SafeByteArray>(); + + public final Signal1<Message> onMessageReceived = new Signal1<Message>(); + public final Signal1<Presence> onPresenceReceived = new Signal1<Presence>(); + + public CoreComponent(final JID jid, final String secret, NetworkFactories networkFactories) { + this.networkFactories = networkFactories; + this.jid_ = jid; + this.secret_ = secret; + this.disconnectRequested_ = false; + stanzaChannel_ = new ComponentSessionStanzaChannel(); + onMessageReceivedConnection = stanzaChannel_.onMessageReceived.connect(onMessageReceived); + onPresenceReceivedConnection = stanzaChannel_.onPresenceReceived.connect(onPresenceReceived); + onAvailableChangedConnection = stanzaChannel_.onAvailableChanged.connect(new Slot1<Boolean>() { + @Override + public void call(Boolean b1) { + handleStanzaChannelAvailableChanged(b1); + } + }); + iqRouter_ = new IQRouter(stanzaChannel_); + iqRouter_.setFrom(jid); + } + + /** + * This method needs to be called for the object to be eligible for garbage collection. + */ + public void delete() { + if (session_ != null || connection_ != null) { + System.err.println("Warning: Component not disconnected properly\n"); + } + onAvailableChangedConnection.disconnect(); + onMessageReceivedConnection.disconnect(); + onAvailableChangedConnection.disconnect(); + } + + public void connect(final String host, int port) { + assert(connector_ == null); + connector_ = ComponentConnector.create(host, port, networkFactories.getDomainNameResolver(), networkFactories.getConnectionFactory(), networkFactories.getTimerFactory()); + onConnectFinishedConnection = connector_.onConnectFinished.connect(new Slot1<Connection>() { + @Override + public void call(Connection c1) { + handleConnectorFinished(c1); + } + }); + connector_.setTimeoutMilliseconds(60*1000); + connector_.start(); + } + + public void disconnect() { + // FIXME: We should be able to do without this boolean. We just have to make sure we can tell the difference between + // connector finishing without a connection due to an error or because of a disconnect. + disconnectRequested_ = true; + if (session_ != null) { + session_.finish(); + } + else if (connector_ != null) { + connector_.stop(); + assert(session_ == null); + } + //assert(!session_); /* commenting out until we have time to refactor to be like CoreClient */ + //assert(!sessionStream_); + //assert(!connector_); + disconnectRequested_ = false; + } + + public void sendMessage(Message message) { + stanzaChannel_.sendMessage(message); + } + + public void sendPresence(Presence presence) { + stanzaChannel_.sendPresence(presence); + } + + public void sendData(final String data) { + sessionStream_.writeData(data); + } + + public IQRouter getIQRouter() { + return iqRouter_; + } + + public StanzaChannel getStanzaChannel() { + return stanzaChannel_; + } + + public boolean isAvailable() { + return stanzaChannel_.isAvailable(); + } + + /** + * Returns the JID of the component + */ + public JID getJID() { + return jid_; + } + + private void handleConnectorFinished(Connection connection) { + onConnectFinishedConnection.disconnect(); + connector_ = null; + if (connection == null) { + if (!disconnectRequested_) { + onError.emit(new ComponentError(ComponentError.Type.ConnectionError)); + } + } + else { + assert(connection_ == null); + connection_ = connection; + + assert(sessionStream_ == null); + //TODO: PORT TLSOPTIONS. + sessionStream_ = new BasicSessionStream(StreamType.ComponentStreamType, connection_, getPayloadParserFactories(), getPayloadSerializers(), null, networkFactories.getTimerFactory()); + onDataReadConnection = sessionStream_.onDataRead.connect(new Slot1<SafeByteArray>() { + @Override + public void call(SafeByteArray s1) { + handleDataRead(s1); + } + }); + onDataWrittenConnection = sessionStream_.onDataWritten.connect(new Slot1<SafeByteArray>() { + @Override + public void call(SafeByteArray s1) { + handleDataWritten(s1); + } + }); + + session_ = ComponentSession.create(jid_, secret_, sessionStream_, networkFactories.getCryptoProvider()); + stanzaChannel_.setSession(session_); + onFinishedConnection = session_.onFinished.connect(new Slot1<com.isode.stroke.base.Error>() { + @Override + public void call(com.isode.stroke.base.Error e1) { + handleSessionFinished(e1); + } + }); + session_.start(); + } + } + + private void handleStanzaChannelAvailableChanged(boolean available) { + if (available) { + onConnected.emit(); + } + } + + private void handleSessionFinished(Error error) { + onFinishedConnection.disconnect(); + session_ = null; + + onDataReadConnection.disconnect(); + onDataWrittenConnection.disconnect(); + sessionStream_ = null; + + connection_.disconnect(); + connection_ = null; + + if (error != null) { + ComponentError componentError = new ComponentError(); + if(error instanceof ComponentSession.Error) { + ComponentSession.Error actualError = (ComponentSession.Error)error; + switch(actualError.type) { + case AuthenticationFailedError: + componentError = new ComponentError(ComponentError.Type.AuthenticationFailedError); + break; + case UnexpectedElementError: + componentError = new ComponentError(ComponentError.Type.UnexpectedElementError); + break; + } + } + else if(error instanceof SessionStream.Error) { + SessionStream.Error actualError = (SessionStream.Error)error; + switch(actualError.type) { + case ParseError: + componentError = new ComponentError(ComponentError.Type.XMLError); + break; + case TLSError: + assert(false); + componentError = new ComponentError(ComponentError.Type.UnknownError); + break; + case InvalidTLSCertificateError: + assert(false); + componentError = new ComponentError(ComponentError.Type.UnknownError); + break; + case ConnectionReadError: + componentError = new ComponentError(ComponentError.Type.ConnectionReadError); + break; + case ConnectionWriteError: + componentError = new ComponentError(ComponentError.Type.ConnectionWriteError); + break; + } + } + onError.emit(componentError); + } + } + + private void handleDataRead(SafeByteArray data) { + onDataRead.emit(data); + } + + private void handleDataWritten(SafeByteArray data) { + onDataWritten.emit(data); + } +} diff --git a/src/com/isode/stroke/elements/ComponentHandshake.java b/src/com/isode/stroke/elements/ComponentHandshake.java index f5ae830..0e42988 100644 --- a/src/com/isode/stroke/elements/ComponentHandshake.java +++ b/src/com/isode/stroke/elements/ComponentHandshake.java @@ -12,9 +12,9 @@ package com.isode.stroke.elements; import com.isode.stroke.base.NotNull; -import com.isode.stroke.elements.TopLevelElement; +import com.isode.stroke.elements.Element; -public class ComponentHandshake implements TopLevelElement { +public class ComponentHandshake implements Element { private String data = ""; diff --git a/src/com/isode/stroke/entity/Entity.java b/src/com/isode/stroke/entity/Entity.java new file mode 100644 index 0000000..80395ca --- /dev/null +++ b/src/com/isode/stroke/entity/Entity.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2010 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.entity; + +import com.isode.stroke.parser.PayloadParserFactory; +import com.isode.stroke.serializer.PayloadSerializer; +import com.isode.stroke.parser.payloadparsers.FullPayloadParserFactoryCollection; +import com.isode.stroke.serializer.payloadserializers.FullPayloadSerializerCollection; +import com.isode.stroke.parser.PayloadParserFactoryCollection; +import com.isode.stroke.serializer.PayloadSerializerCollection; + +/** + * The base class for XMPP entities (Clients, Components). + */ +public class Entity { + + private FullPayloadParserFactoryCollection payloadParserFactories; + private FullPayloadSerializerCollection payloadSerializers; + + public Entity() { + payloadParserFactories = new FullPayloadParserFactoryCollection(); + payloadSerializers = new FullPayloadSerializerCollection(); + } + + public void addPayloadParserFactory(PayloadParserFactory payloadParserFactory) { + payloadParserFactories.addFactory(payloadParserFactory); + } + + public void removePayloadParserFactory(PayloadParserFactory payloadParserFactory) { + payloadParserFactories.removeFactory(payloadParserFactory); + } + + public void addPayloadSerializer(PayloadSerializer payloadSerializer) { + payloadSerializers.addSerializer(payloadSerializer); + } + + public void removePayloadSerializer(PayloadSerializer payloadSerializer) { + payloadSerializers.removeSerializer(payloadSerializer); + } + + protected PayloadParserFactoryCollection getPayloadParserFactories() { + return payloadParserFactories; + } + + protected PayloadSerializerCollection getPayloadSerializers() { + return payloadSerializers; + } +}
\ No newline at end of file diff --git a/src/com/isode/stroke/examples/gui/StrokeGUI.java b/src/com/isode/stroke/examples/gui/StrokeGUI.java index 282fde6..6fc07de 100644 --- a/src/com/isode/stroke/examples/gui/StrokeGUI.java +++ b/src/com/isode/stroke/examples/gui/StrokeGUI.java @@ -11,6 +11,7 @@ package com.isode.stroke.examples.gui; +import com.isode.stroke.base.SafeByteArray; import com.isode.stroke.client.ClientError; import com.isode.stroke.client.ClientOptions; import com.isode.stroke.client.CoreClient; @@ -179,15 +180,15 @@ public class StrokeGUI extends javax.swing.JFrame { thisObject.handleClientError(p1); } }); - client_.onDataRead.connect(new Slot1<String>() { + client_.onDataRead.connect(new Slot1<SafeByteArray>() { - public void call(String p1) { + public void call(SafeByteArray p1) { xmlText_.append(">>> " + p1 + "\n"); } }); - client_.onDataWritten.connect(new Slot1<String>() { + client_.onDataWritten.connect(new Slot1<SafeByteArray>() { - public void call(String p1) { + public void call(SafeByteArray p1) { xmlText_.append("<<< " + p1 + "\n"); } }); diff --git a/src/com/isode/stroke/parser/PayloadParserFactoryCollection.java b/src/com/isode/stroke/parser/PayloadParserFactoryCollection.java index 6d38ebd..d034826 100644 --- a/src/com/isode/stroke/parser/PayloadParserFactoryCollection.java +++ b/src/com/isode/stroke/parser/PayloadParserFactoryCollection.java @@ -24,6 +24,12 @@ public class PayloadParserFactoryCollection { } } + public void removeFactory(PayloadParserFactory factory) { + while(factories_.contains(factory)) { + factories_.remove(factory); + } + } + public void setDefaultFactory(PayloadParserFactory factory) { defaultFactory_ = factory; } diff --git a/src/com/isode/stroke/session/BasicSessionStream.java b/src/com/isode/stroke/session/BasicSessionStream.java index 4f0e75b..9ba862d 100644 --- a/src/com/isode/stroke/session/BasicSessionStream.java +++ b/src/com/isode/stroke/session/BasicSessionStream.java @@ -175,6 +175,10 @@ public class BasicSessionStream extends SessionStream { return tlsLayer.getContext().getFinishMessage(); } + public boolean supportsZLibCompression() { + return true; + } + public void addZLibCompression() { compressionLayer = new CompressionLayer(); streamStack.addLayer(compressionLayer); @@ -232,12 +236,12 @@ public class BasicSessionStream extends SessionStream { } } - private void handleDataRead(ByteArray data) { - onDataRead.emit(data.toString()); + private void handleDataRead(SafeByteArray data) { + onDataRead.emit(data); } - private void handleDataWritten(ByteArray data) { - onDataWritten.emit(data.toString()); + private void handleDataWritten(SafeByteArray data) { + onDataWritten.emit(data); } @Override public String toString() { diff --git a/src/com/isode/stroke/session/SessionStream.java b/src/com/isode/stroke/session/SessionStream.java index 2b9932b..3171ec0 100644 --- a/src/com/isode/stroke/session/SessionStream.java +++ b/src/com/isode/stroke/session/SessionStream.java @@ -10,6 +10,7 @@ package com.isode.stroke.session; import java.util.List; +import com.isode.stroke.base.SafeByteArray; import com.isode.stroke.base.ByteArray; import com.isode.stroke.elements.Element; import com.isode.stroke.elements.ProtocolHeader; @@ -50,6 +51,8 @@ public abstract class SessionStream { public abstract void writeData(String data); + public abstract boolean supportsZLibCompression(); + public abstract void addZLibCompression(); public abstract boolean supportsTLSEncryption(); @@ -81,8 +84,8 @@ public abstract class SessionStream { public final Signal1<Element> onElementReceived = new Signal1<Element>(); public final Signal1<Error> onClosed = new Signal1<Error>(); public final Signal onTLSEncrypted = new Signal(); - public final Signal1<String> onDataRead = new Signal1<String>(); - public final Signal1<String> onDataWritten = new Signal1<String>(); + public final Signal1<SafeByteArray> onDataRead = new Signal1<SafeByteArray>(); + public final Signal1<SafeByteArray> onDataWritten = new Signal1<SafeByteArray>(); protected CertificateWithKey getTLSCertificate() { return certificate; } diff --git a/test/com/isode/stroke/component/ComponentSessionTest.java b/test/com/isode/stroke/component/ComponentSessionTest.java new file mode 100644 index 0000000..c7fa36b --- /dev/null +++ b/test/com/isode/stroke/component/ComponentSessionTest.java @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2010-2014 Isode Limited. + * All rights reserved. + * See the COPYING file for more information. + */ +/* + * Copyright (c) 2015 Tarun Gupta. + * Licensed under the simplified BSD license. + * See Documentation/Licenses/BSD-simplified.txt for more information. + */ + +package com.isode.stroke.component; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import org.junit.Before; +import com.isode.stroke.component.ComponentSession; +import com.isode.stroke.elements.ComponentHandshake; +import com.isode.stroke.elements.AuthFailure; +import com.isode.stroke.elements.ProtocolHeader; +import com.isode.stroke.elements.Element; +import com.isode.stroke.base.ByteArray; +import com.isode.stroke.tls.Certificate; +import com.isode.stroke.tls.CertificateVerificationError; +import com.isode.stroke.crypto.CryptoProvider; +import com.isode.stroke.crypto.JavaCryptoProvider; +import com.isode.stroke.session.SessionStream; +import com.isode.stroke.jid.JID; +import com.isode.stroke.signals.Slot1; +import java.util.Vector; + +public class ComponentSessionTest { + + private class MockSessionStream extends SessionStream { + + class Event { + + public Event(Element element) { + this.element = element; + this.footer = false; + } + + public Event(final ProtocolHeader header) { + this.header = header; + this.footer = false; + } + + public Event() { + this.footer = true; + } + + public Element element; + public ProtocolHeader header; + public boolean footer; + }; + + public MockSessionStream() { + this.available = true; + this.whitespacePingEnabled = false; + this.resetCount = 0; + } + + public void close() { + onClosed.emit((SessionStream.Error)null); + } + + public boolean isOpen() { + return available; + } + + public void writeHeader(final ProtocolHeader header) { + receivedEvents.add(new Event(header)); + } + + public void writeFooter() { + receivedEvents.add(new Event()); + } + + public void writeElement(Element element) { + receivedEvents.add(new Event(element)); + } + + public void writeData(final String str) { + + } + + public boolean supportsTLSEncryption() { + return false; + } + + public void addTLSEncryption() { + assert(false); + } + + public boolean isTLSEncrypted() { + return false; + } + + public ByteArray getTLSFinishMessage() { + return new ByteArray(); + } + + public Certificate getPeerCertificate() { + return (Certificate)null; + } + + public Vector<Certificate> getPeerCertificateChain() { + return (Vector<Certificate>)null; + } + + public CertificateVerificationError getPeerCertificateVerificationError() { + return (CertificateVerificationError)null; + } + + public boolean supportsZLibCompression() { + return true; + } + + public void addZLibCompression() { + assert(false); + } + + public void setWhitespacePingEnabled(boolean enabled) { + whitespacePingEnabled = enabled; + } + + public void resetXMPPParser() { + resetCount++; + } + + public void breakConnection() { + onClosed.emit(new SessionStream.Error(SessionStream.Error.Type.ConnectionReadError)); + } + + public void sendStreamStart() { + ProtocolHeader header = new ProtocolHeader(); + header.setFrom("service.foo.com"); + onStreamStartReceived.emit(header); + } + + public void sendHandshakeResponse() { + onElementReceived.emit(new ComponentHandshake()); + } + + public void sendHandshakeError() { + // FIXME: This isn't the correct element + onElementReceived.emit(new AuthFailure()); + } + + public void receiveStreamStart() { + Event event = popEvent(); + assertNotNull(event.header); + } + + public void receiveHandshake() { + Event event = popEvent(); + assertNotNull(event.element); + ComponentHandshake handshake = (ComponentHandshake)(event.element); + assertNotNull(handshake); + assertEquals("4c4f8a41141722c8bbfbdd92d827f7b2fc0a542b", handshake.getData()); + } + + public Event popEvent() { + assertFalse(receivedEvents.isEmpty()); + Event event = receivedEvents.firstElement(); + receivedEvents.remove(receivedEvents.firstElement()); + return event; + } + + public boolean available; + public boolean whitespacePingEnabled; + public String bindID = ""; + public int resetCount; + public Vector<Event> receivedEvents = new Vector<Event>(); + }; + + private ComponentSession createSession() { + ComponentSession session = ComponentSession.create(new JID("service.foo.com"), "servicesecret", server, crypto); + session.onFinished.connect(new Slot1<com.isode.stroke.base.Error>() { + @Override + public void call(com.isode.stroke.base.Error e1) { + handleSessionFinished(e1); + } + }); + return session; + } + + private void handleSessionFinished(com.isode.stroke.base.Error error) { + sessionFinishedReceived = true; + sessionFinishedError = error; + } + + private MockSessionStream server; + private boolean sessionFinishedReceived; + private com.isode.stroke.base.Error sessionFinishedError; + private CryptoProvider crypto; + + @Before + public void setUp() { + server = new MockSessionStream(); + sessionFinishedReceived = false; + crypto = new JavaCryptoProvider(); + } + + @Test + public void testStart() { + ComponentSession session = createSession(); + session.start(); + server.receiveStreamStart(); + server.sendStreamStart(); + server.receiveHandshake(); + server.sendHandshakeResponse(); + + assertNotNull(server.whitespacePingEnabled); + + session.finish(); + assertFalse(server.whitespacePingEnabled); + } + + @Test + public void testStart_Error() { + ComponentSession session = createSession(); + session.start(); + server.breakConnection(); + + assertEquals(ComponentSession.State.Finished, session.getState()); + assertTrue(sessionFinishedReceived); + assertNotNull(sessionFinishedError); + } + + @Test + public void testStart_Unauthorized() { + ComponentSession session = createSession(); + session.start(); + server.receiveStreamStart(); + server.sendStreamStart(); + server.receiveHandshake(); + server.sendHandshakeError(); + + assertEquals(ComponentSession.State.Finished, session.getState()); + assertTrue(sessionFinishedReceived); + assertNotNull(sessionFinishedError); + } +} diff --git a/test/com/isode/stroke/pubsub/Client.java b/test/com/isode/stroke/pubsub/Client.java index 712db00..c68ba96 100644 --- a/test/com/isode/stroke/pubsub/Client.java +++ b/test/com/isode/stroke/pubsub/Client.java @@ -15,6 +15,7 @@ import com.isode.stroke.network.JavaNetworkFactories; import com.isode.stroke.queries.IQRouter; import com.isode.stroke.signals.Slot; import com.isode.stroke.signals.Slot1; +import com.isode.stroke.base.SafeByteArray; public class Client { @@ -49,8 +50,8 @@ public class Client { } }); - client_.onDataRead.connect(new Slot1<String>() { - public void call(String xml) { + client_.onDataRead.connect(new Slot1<SafeByteArray>() { + public void call(SafeByteArray xml) { if (!connecting_ && !disconnecting_) { if (debugInfoXml) { System.out.println("[" + name_ + "] Client.Read:"); @@ -60,8 +61,8 @@ public class Client { } }); - client_.onDataWritten.connect(new Slot1<String>() { - public void call(String xml) { + client_.onDataWritten.connect(new Slot1<SafeByteArray>() { + public void call(SafeByteArray xml) { if (!connecting_ && !disconnecting_) { if (debugInfoXml) { System.out.println("[" + name_ + "] Client.Write:"); |