summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNick Hudson <nick.hudson@isode.com>2012-03-12 17:11:31 (GMT)
committerKevin Smith <git@kismith.co.uk>2012-03-13 10:45:12 (GMT)
commitb2f96e0548abbdaaa097b4908ff4583b4c642772 (patch)
treeeeff86c1370ea179c3a41729074bdabe497e0a7d
parentc5392b36c368ebdca2e8ab356eb0d1fb0d36a5cb (diff)
downloadstroke-b2f96e0548abbdaaa097b4908ff4583b4c642772.zip
stroke-b2f96e0548abbdaaa097b4908ff4583b4c642772.tar.bz2
Update JavaConnection to use SocketChannel rather than Socket
The initial implementation of JavaConnection used the "Socket" class, from which it derived InputStream and OutputStream objects to read/write data. In order to avoid blocking, the "read" loop would only attempt to read data if the InputStream.available() method indicated that there was data ready to be read. However, in order to determine that an InputStream has been closed, you have to read from it and have it return -1 to indicate "end-of-stream". There was no explicit test for a -1 return, but even if there had been, it wouldn't have tripped, because of the "available()" test. So this change makes JavaConnection use "SocketChannel" rather than Socket, which allows (when you configure the SocketChannel to be non-blocking) you to issue non-blocking reads which *can* return -1 when the input stream has finished. Test-information: Tested with test program and MLC, using both TLS and non-TLS connections. Applications still work as expected. Observed that when I deliberately stop the server, or break the socket connection, the client almost immediately gets a "disconnected" signal. Prior to this change, stopping the server results in the client getting a disconnected signal only when it next tries to write something.
-rw-r--r--src/com/isode/stroke/network/JavaConnection.java95
1 files changed, 58 insertions, 37 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java
index 08cd497..ade52f0 100644
--- a/src/com/isode/stroke/network/JavaConnection.java
+++ b/src/com/isode/stroke/network/JavaConnection.java
@@ -9,9 +9,9 @@
package com.isode.stroke.network;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -26,8 +26,6 @@ public class JavaConnection extends Connection implements EventOwner {
private class Worker implements Runnable {
private final HostAddressPort address_;
- private OutputStream write_;
- private InputStream read_;
private final List<ByteArray> writeBuffer_ = Collections.synchronizedList(new ArrayList<ByteArray>());
public Worker(HostAddressPort address) {
@@ -36,55 +34,78 @@ public class JavaConnection extends Connection implements EventOwner {
public void run() {
try {
- socket_ = new Socket(address_.getAddress().getInetAddress(), address_.getPort());
- write_ = socket_.getOutputStream();
- read_ = socket_.getInputStream();
+ InetSocketAddress isa = new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort());
+ socketChannel_ = SocketChannel.open(isa);
+
+ /* By default, SocketChannels start off in blocking mode, which
+ * isn't what we want
+ */
+ socketChannel_.configureBlocking(false);
} catch (IOException ex) {
handleConnected(true);
return;
}
handleConnected(false);
while (!disconnecting_) {
- boolean didWrite = false;
while (!writeBuffer_.isEmpty()) {
- didWrite = true;
ByteArray data = writeBuffer_.get(0);
- for (byte datum : data.getData()) {
- try {
- write_.write(datum);
- } catch (IOException ex) {
- disconnecting_ = true;
- handleDisconnected(Error.WriteError);
- }
- }
- writeBuffer_.remove(0);
- }
- if (didWrite && !disconnecting_) {
+ ByteBuffer bb = ByteBuffer.wrap(data.getData());
try {
- write_.flush();
+ /* Because the SocketChannel is non-blocking, we have to
+ * be prepared to cope with the write operation not
+ * consuming all of the data
+ */
+ boolean finishedWriting = false;
+ while (!finishedWriting && !disconnecting_) {
+ socketChannel_.write(bb);
+ finishedWriting = (bb.remaining() == 0);
+ if (!finishedWriting) {
+ try {
+ /* Give the output buffer a chance to empty */
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ /* Perhaps someone has set disconnecting_ */
+ }
+ }
+ }
} catch (IOException ex) {
disconnecting_ = true;
- handleDisconnected(Error.WriteError);
+ handleDisconnected(Error.WriteError);
}
+ writeBuffer_.remove(0);
}
+
ByteArray data = new ByteArray();
try {
- while (read_.available() != 0) {
- byte[] b = new byte[1024];
- int i = read_.read(b,0,b.length);
- if (i > 0) {
- for (int j=0; j<i; j++) {
- data.append(b[j]);
- }
+ ByteBuffer bb = ByteBuffer.allocate(1024);
+
+ int count = socketChannel_.read(bb);
+ while (count > 0) {
+ bb.flip();
+ byte[] result = new byte[bb.remaining()];
+ bb.get(result);
+ bb.compact();
+ for (int i=0; i<result.length; i++) {
+ data.append(result[i]);
}
- }
+ count = socketChannel_.read(bb);
+ }
+ if (count == -1) {
+ /* socketChannel input has reached "end-of-stream", which
+ * we regard as meaning that the socket has been closed
+ */
+ throw new IOException("socketChannel_.read returned -1");
+ }
} catch (IOException ex) {
handleDisconnected(Error.ReadError);
return;
}
+
if (!data.isEmpty()) {
handleDataRead(data);
}
+
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
@@ -92,9 +113,7 @@ public class JavaConnection extends Connection implements EventOwner {
}
}
try {
- read_.close();
- write_.close();
- socket_.close();
+ socketChannel_.close();
} catch (IOException ex) {
/* Do we need to return an error if we're already trying to close? */
}
@@ -145,6 +164,7 @@ public class JavaConnection extends Connection implements EventOwner {
@Override
public void connect(HostAddressPort address) {
+ hostAddressPort_ = address;
worker_ = new Worker(address);
Thread workerThread = new Thread(worker_);
workerThread.setDaemon(true);
@@ -163,20 +183,21 @@ public class JavaConnection extends Connection implements EventOwner {
@Override
public HostAddressPort getLocalAddress() {
- return new HostAddressPort(new HostAddress(socket_.getLocalAddress()), socket_.getLocalPort());
+ return hostAddressPort_;
}
@Override
public String toString()
{
return "JavaConnection " +
- (socket_ == null ? "with no socket configured" : "for " + getLocalAddress()) +
+ (socketChannel_ == null ? "with no socket configured" : "for " + getLocalAddress()) +
(disconnecting_ ? " (disconnecting)" : "");
}
private final EventLoop eventLoop_;
private boolean disconnecting_ = false;
- private Socket socket_;
+ private SocketChannel socketChannel_;
private Worker worker_;
+ private HostAddressPort hostAddressPort_;
}