diff options
author | Nick Hudson <nick.hudson@isode.com> | 2012-03-12 17:11:31 (GMT) |
---|---|---|
committer | Kevin Smith <git@kismith.co.uk> | 2012-03-13 10:45:12 (GMT) |
commit | b2f96e0548abbdaaa097b4908ff4583b4c642772 (patch) | |
tree | eeff86c1370ea179c3a41729074bdabe497e0a7d | |
parent | c5392b36c368ebdca2e8ab356eb0d1fb0d36a5cb (diff) | |
download | stroke-b2f96e0548abbdaaa097b4908ff4583b4c642772.zip stroke-b2f96e0548abbdaaa097b4908ff4583b4c642772.tar.bz2 |
Update JavaConnection to use SocketChannel rather than Socket
The initial implementation of JavaConnection used the "Socket" class,
from which it derived InputStream and OutputStream objects to read/write data.
In order to avoid blocking, the "read" loop would only attempt to read
data if the InputStream.available() method indicated that there was
data ready to be read. However, in order to determine that an
InputStream has been closed, you have to read from it and have it
return -1 to indicate "end-of-stream".
There was no explicit test for a -1 return, but even if there had
been, it wouldn't have tripped, because of the "available()" test.
So this change makes JavaConnection use "SocketChannel" rather than
Socket, which allows (when you configure the SocketChannel to be
non-blocking) you to issue non-blocking reads which *can* return -1
when the input stream has finished.
Test-information:
Tested with test program and MLC, using both TLS and non-TLS
connections. Applications still work as expected.
Observed that when I deliberately stop the server, or break the socket
connection, the client almost immediately gets a "disconnected"
signal.
Prior to this change, stopping the server results in the client
getting a disconnected signal only when it next tries to write
something.
-rw-r--r-- | src/com/isode/stroke/network/JavaConnection.java | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java index 08cd497..ade52f0 100644 --- a/src/com/isode/stroke/network/JavaConnection.java +++ b/src/com/isode/stroke/network/JavaConnection.java @@ -9,9 +9,9 @@ package com.isode.stroke.network; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -26,8 +26,6 @@ public class JavaConnection extends Connection implements EventOwner { private class Worker implements Runnable { private final HostAddressPort address_; - private OutputStream write_; - private InputStream read_; private final List<ByteArray> writeBuffer_ = Collections.synchronizedList(new ArrayList<ByteArray>()); public Worker(HostAddressPort address) { @@ -36,55 +34,78 @@ public class JavaConnection extends Connection implements EventOwner { public void run() { try { - socket_ = new Socket(address_.getAddress().getInetAddress(), address_.getPort()); - write_ = socket_.getOutputStream(); - read_ = socket_.getInputStream(); + InetSocketAddress isa = new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort()); + socketChannel_ = SocketChannel.open(isa); + + /* By default, SocketChannels start off in blocking mode, which + * isn't what we want + */ + socketChannel_.configureBlocking(false); } catch (IOException ex) { handleConnected(true); return; } handleConnected(false); while (!disconnecting_) { - boolean didWrite = false; while (!writeBuffer_.isEmpty()) { - didWrite = true; ByteArray data = writeBuffer_.get(0); - for (byte datum : data.getData()) { - try { - write_.write(datum); - } catch (IOException ex) { - disconnecting_ = true; - handleDisconnected(Error.WriteError); - } - } - writeBuffer_.remove(0); - } - if (didWrite && !disconnecting_) { + ByteBuffer bb = ByteBuffer.wrap(data.getData()); try { - write_.flush(); + /* Because the SocketChannel is non-blocking, we have to + * be prepared to cope with the write operation not + * consuming all of the data + */ + boolean finishedWriting = false; + while (!finishedWriting && !disconnecting_) { + socketChannel_.write(bb); + finishedWriting = (bb.remaining() == 0); + if (!finishedWriting) { + try { + /* Give the output buffer a chance to empty */ + Thread.sleep(100); + } + catch (InterruptedException e) { + /* Perhaps someone has set disconnecting_ */ + } + } + } } catch (IOException ex) { disconnecting_ = true; - handleDisconnected(Error.WriteError); + handleDisconnected(Error.WriteError); } + writeBuffer_.remove(0); } + ByteArray data = new ByteArray(); try { - while (read_.available() != 0) { - byte[] b = new byte[1024]; - int i = read_.read(b,0,b.length); - if (i > 0) { - for (int j=0; j<i; j++) { - data.append(b[j]); - } + ByteBuffer bb = ByteBuffer.allocate(1024); + + int count = socketChannel_.read(bb); + while (count > 0) { + bb.flip(); + byte[] result = new byte[bb.remaining()]; + bb.get(result); + bb.compact(); + for (int i=0; i<result.length; i++) { + data.append(result[i]); } - } + count = socketChannel_.read(bb); + } + if (count == -1) { + /* socketChannel input has reached "end-of-stream", which + * we regard as meaning that the socket has been closed + */ + throw new IOException("socketChannel_.read returned -1"); + } } catch (IOException ex) { handleDisconnected(Error.ReadError); return; } + if (!data.isEmpty()) { handleDataRead(data); } + try { Thread.sleep(100); } catch (InterruptedException ex) { @@ -92,9 +113,7 @@ public class JavaConnection extends Connection implements EventOwner { } } try { - read_.close(); - write_.close(); - socket_.close(); + socketChannel_.close(); } catch (IOException ex) { /* Do we need to return an error if we're already trying to close? */ } @@ -145,6 +164,7 @@ public class JavaConnection extends Connection implements EventOwner { @Override public void connect(HostAddressPort address) { + hostAddressPort_ = address; worker_ = new Worker(address); Thread workerThread = new Thread(worker_); workerThread.setDaemon(true); @@ -163,20 +183,21 @@ public class JavaConnection extends Connection implements EventOwner { @Override public HostAddressPort getLocalAddress() { - return new HostAddressPort(new HostAddress(socket_.getLocalAddress()), socket_.getLocalPort()); + return hostAddressPort_; } @Override public String toString() { return "JavaConnection " + - (socket_ == null ? "with no socket configured" : "for " + getLocalAddress()) + + (socketChannel_ == null ? "with no socket configured" : "for " + getLocalAddress()) + (disconnecting_ ? " (disconnecting)" : ""); } private final EventLoop eventLoop_; private boolean disconnecting_ = false; - private Socket socket_; + private SocketChannel socketChannel_; private Worker worker_; + private HostAddressPort hostAddressPort_; } |