diff options
Diffstat (limited to 'src/com/isode')
-rw-r--r-- | src/com/isode/stroke/network/JavaConnection.java | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java index 08cd497..ade52f0 100644 --- a/src/com/isode/stroke/network/JavaConnection.java +++ b/src/com/isode/stroke/network/JavaConnection.java @@ -9,9 +9,9 @@ package com.isode.stroke.network; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -26,8 +26,6 @@ public class JavaConnection extends Connection implements EventOwner { private class Worker implements Runnable { private final HostAddressPort address_; - private OutputStream write_; - private InputStream read_; private final List<ByteArray> writeBuffer_ = Collections.synchronizedList(new ArrayList<ByteArray>()); public Worker(HostAddressPort address) { @@ -36,55 +34,78 @@ public class JavaConnection extends Connection implements EventOwner { public void run() { try { - socket_ = new Socket(address_.getAddress().getInetAddress(), address_.getPort()); - write_ = socket_.getOutputStream(); - read_ = socket_.getInputStream(); + InetSocketAddress isa = new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort()); + socketChannel_ = SocketChannel.open(isa); + + /* By default, SocketChannels start off in blocking mode, which + * isn't what we want + */ + socketChannel_.configureBlocking(false); } catch (IOException ex) { handleConnected(true); return; } handleConnected(false); while (!disconnecting_) { - boolean didWrite = false; while (!writeBuffer_.isEmpty()) { - didWrite = true; ByteArray data = writeBuffer_.get(0); - for (byte datum : data.getData()) { - try { - write_.write(datum); - } catch (IOException ex) { - disconnecting_ = true; - handleDisconnected(Error.WriteError); - } - } - writeBuffer_.remove(0); - } - if (didWrite && !disconnecting_) { + ByteBuffer bb = ByteBuffer.wrap(data.getData()); try { - write_.flush(); + /* Because the SocketChannel is non-blocking, we have to + * be prepared to cope with the write operation not + * consuming all of the data + */ + boolean finishedWriting = false; + while (!finishedWriting && !disconnecting_) { + socketChannel_.write(bb); + finishedWriting = (bb.remaining() == 0); + if (!finishedWriting) { + try { + /* Give the output buffer a chance to empty */ + Thread.sleep(100); + } + catch (InterruptedException e) { + /* Perhaps someone has set disconnecting_ */ + } + } + } } catch (IOException ex) { disconnecting_ = true; - handleDisconnected(Error.WriteError); + handleDisconnected(Error.WriteError); } + writeBuffer_.remove(0); } + ByteArray data = new ByteArray(); try { - while (read_.available() != 0) { - byte[] b = new byte[1024]; - int i = read_.read(b,0,b.length); - if (i > 0) { - for (int j=0; j<i; j++) { - data.append(b[j]); - } + ByteBuffer bb = ByteBuffer.allocate(1024); + + int count = socketChannel_.read(bb); + while (count > 0) { + bb.flip(); + byte[] result = new byte[bb.remaining()]; + bb.get(result); + bb.compact(); + for (int i=0; i<result.length; i++) { + data.append(result[i]); } - } + count = socketChannel_.read(bb); + } + if (count == -1) { + /* socketChannel input has reached "end-of-stream", which + * we regard as meaning that the socket has been closed + */ + throw new IOException("socketChannel_.read returned -1"); + } } catch (IOException ex) { handleDisconnected(Error.ReadError); return; } + if (!data.isEmpty()) { handleDataRead(data); } + try { Thread.sleep(100); } catch (InterruptedException ex) { @@ -92,9 +113,7 @@ public class JavaConnection extends Connection implements EventOwner { } } try { - read_.close(); - write_.close(); - socket_.close(); + socketChannel_.close(); } catch (IOException ex) { /* Do we need to return an error if we're already trying to close? */ } @@ -145,6 +164,7 @@ public class JavaConnection extends Connection implements EventOwner { @Override public void connect(HostAddressPort address) { + hostAddressPort_ = address; worker_ = new Worker(address); Thread workerThread = new Thread(worker_); workerThread.setDaemon(true); @@ -163,20 +183,21 @@ public class JavaConnection extends Connection implements EventOwner { @Override public HostAddressPort getLocalAddress() { - return new HostAddressPort(new HostAddress(socket_.getLocalAddress()), socket_.getLocalPort()); + return hostAddressPort_; } @Override public String toString() { return "JavaConnection " + - (socket_ == null ? "with no socket configured" : "for " + getLocalAddress()) + + (socketChannel_ == null ? "with no socket configured" : "for " + getLocalAddress()) + (disconnecting_ ? " (disconnecting)" : ""); } private final EventLoop eventLoop_; private boolean disconnecting_ = false; - private Socket socket_; + private SocketChannel socketChannel_; private Worker worker_; + private HostAddressPort hostAddressPort_; } |