summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKevin Smith <git@kismith.co.uk>2011-07-01 09:19:49 (GMT)
committerKevin Smith <git@kismith.co.uk>2011-07-01 09:19:49 (GMT)
commit2da71a8a85486a494343f1662d64fb5ae5a2a44e (patch)
tree23992f9f2a00bac23b345e5c2cc9c1194efc25be /src/com/isode/stroke/network/JavaConnection.java
downloadstroke-2da71a8a85486a494343f1662d64fb5ae5a2a44e.zip
stroke-2da71a8a85486a494343f1662d64fb5ae5a2a44e.tar.bz2
Initial import
Diffstat (limited to 'src/com/isode/stroke/network/JavaConnection.java')
-rw-r--r--src/com/isode/stroke/network/JavaConnection.java176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java
new file mode 100644
index 0000000..e43707c
--- /dev/null
+++ b/src/com/isode/stroke/network/JavaConnection.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2010 Remko Tron¨on
+ * Licensed under the GNU General Public License v3.
+ * See Documentation/Licenses/GPLv3.txt for more information.
+ */
+/*
+ * Copyright (c) 2010-2011, Isode Limited, London, England.
+ * All rights reserved.
+ */
+package com.isode.stroke.network;
+
+import com.isode.stroke.base.ByteArray;
+import com.isode.stroke.eventloop.Event.Callback;
+import com.isode.stroke.eventloop.EventLoop;
+import com.isode.stroke.eventloop.EventOwner;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class JavaConnection extends Connection implements EventOwner {
+
+ private class Worker implements Runnable {
+
+ private final HostAddressPort address_;
+ private OutputStream write_;
+ private BufferedReader read_;
+ private final List<ByteArray> writeBuffer_ = Collections.synchronizedList(new ArrayList<ByteArray>());
+
+ public Worker(HostAddressPort address) {
+ address_ = address;
+ }
+
+ public void run() {
+ try {
+ socket_ = new Socket(address_.getAddress().getInetAddress(), address_.getPort());
+ write_ = socket_.getOutputStream();
+ read_ = new BufferedReader(new InputStreamReader(socket_.getInputStream(), "utf-8"));
+ } catch (IOException ex) {
+ handleConnected(true);
+ return;
+ }
+ handleConnected(false);
+ while (!disconnecting_) {
+ boolean didWrite = false;
+ while (!writeBuffer_.isEmpty()) {
+ didWrite = true;
+ ByteArray data = writeBuffer_.get(0);
+ for (byte datum : data.getData()) {
+ try {
+ write_.write(datum);
+ } catch (IOException ex) {
+ disconnecting_ = true;
+ handleDisconnected(Error.WriteError);
+ }
+ }
+ writeBuffer_.remove(0);
+ }
+ if (didWrite && !disconnecting_) {
+ try {
+ write_.flush();
+ } catch (IOException ex) {
+ disconnecting_ = true;
+ handleDisconnected(Error.WriteError);
+ }
+ }
+ ByteArray data = new ByteArray();
+ try {
+ while (read_.ready()) {
+ char[] c = new char[1024];
+ int i = read_.read(c, 0, c.length);
+ if (i > 0) {
+ data.append(new String(c, 0, i));
+ }
+ }
+ } catch (IOException ex) {
+ handleDisconnected(Error.ReadError);
+ return;
+ }
+ if (!data.isEmpty()) {
+ handleDataRead(data);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ /* We've been woken up, probably to force us to do something.*/
+ }
+ }
+ try {
+ read_.close();
+ write_.close();
+ socket_.close();
+ } catch (IOException ex) {
+ /* Do we need to return an error if we're already trying to close? */
+ }
+ }
+
+ private void handleConnected(final boolean error) {
+ eventLoop_.postEvent(new Callback() {
+ public void run() {
+ onConnectFinished.emit(error);
+ }
+ });
+ }
+
+ private void handleDisconnected(final Error error) {
+ eventLoop_.postEvent(new Callback() {
+ public void run() {
+ onDisconnected.emit(error);
+ }
+ });
+ }
+
+ private void handleDataRead(final ByteArray data) {
+ eventLoop_.postEvent(new Callback() {
+ public void run() {
+ onDataRead.emit(data);
+ }
+ });
+ }
+
+ public void write(ByteArray data) {
+ writeBuffer_.add(data);
+ }
+ }
+
+ private JavaConnection(EventLoop eventLoop) {
+ eventLoop_ = eventLoop;
+ }
+
+ public static JavaConnection create(EventLoop eventLoop) {
+ return new JavaConnection(eventLoop);
+ }
+
+ @Override
+ public void listen() {
+ //TODO: needed for server, not for client.
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void connect(HostAddressPort address) {
+ worker_ = new Worker(address);
+ Thread workerThread = new Thread(worker_);
+ workerThread.setDaemon(true);
+ workerThread.start();
+ }
+
+ @Override
+ public void disconnect() {
+ disconnecting_ = true;
+ }
+
+ @Override
+ public void write(ByteArray data) {
+ worker_.writeBuffer_.add(data);
+ }
+
+ @Override
+ public HostAddressPort getLocalAddress() {
+ return new HostAddressPort(new HostAddress(socket_.getLocalAddress()), socket_.getLocalPort());
+ }
+ private final EventLoop eventLoop_;
+ private boolean disconnecting_ = false;
+ private Socket socket_;
+ private Worker worker_;
+}