summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNick Hudson <nick.hudson@isode.com>2013-07-23 11:07:56 (GMT)
committerNick Hudson <nick.hudson@isode.com>2013-07-26 10:00:01 (GMT)
commit4f743d361c2e84420d02d9c9d10304b7184b3170 (patch)
treee3fed647439a0e85e1feb685b54894bd2f0b5f27 /src/com/isode/stroke/network/JavaConnection.java
parent1006276e3c78c2098d8efd5dedff0ee87e0e006f (diff)
downloadstroke-4f743d361c2e84420d02d9c9d10304b7184b3170.zip
stroke-4f743d361c2e84420d02d9c9d10304b7184b3170.tar.bz2
Re-implement JavaConnection to use Selector
When investigating problems on Solaris, attention focused on the JavaConnection class, whose implementation appeared to be non-optimal. The original implementation had a loop which operated on a non-blocking socket, and looked something like this: while (!disconnecting) { while (something to write) { write data to socket; if write failed { sleep(100); // and try again } } try reading data from socket if (any data was read) { process data from socket; } sleep(100); } Because the socket is non-blocking, the reads/writes return straight away. This means that even when no data is being transferred, the loop is executing around ten times a second checking for any data to read/write. In one case (Solaris client talking to Solaris server on the same VM) we were consistently able to get into a state where a write fails to write any data, so that the "something to write" subloop never exits. This in turn means that the "try reading data" section of the main loop is never reached. Investigation failed to uncover why this problem occurs. The underlying socket appears to be returning EAGAIN (equivalent to EWOULDBLOCK), suggesting that the write fails because the client's local buffer is full. This in turn implies that the server isn't reading data quickly enough, leading to the buffers on the client side being full up. But this doesn't explain why, once things have got into this state, they never free up. At any rate, it was felt that the implementation above is not ideal because it is relying on a polling mechanism that is not efficient, rather than being event driven. So this change re-implements JavaConnection to use a Selector, which means that the main loop is event-driven. The new implementation looks like this while (!disconnected) { wait for selector if (disconnected) { break; } if something to write { try to write data; } if something to read { try to read data; } if still something to write { sleep(100); post wake event; // so that next wait completes straight away } } Test-information: Testing appears to show that the problems we saw on Solaris are no longer seen with this patch (Solaris tests still fail, but later on, which appears to be due to a separate problem). Testing shows that this leads to the thread spending much more time idle, and only being active when data is being read/written (unlike the original implementation which was looping ten times a second regardless of whether any data was being read/written). Testing using MLC seems to show the new implementation works OK. I was unable to provoke the "write buffer not completely written" case, so faked it by making the doWrite() method constrain its maximum write size to 200 bytes. By doing this I verified that the "leftOver" section of code was working properly (and incidentally fixed a problem with the the initial implementation of the patch that had been passing the wrong parameter to System.arrayCopy). Change-Id: I5a6191567ba7e9afdb9a26febf00eae72b00f6eb Signed-off-by: Nick Hudson <nick.hudson@isode.com>
Diffstat (limited to 'src/com/isode/stroke/network/JavaConnection.java')
-rw-r--r--src/com/isode/stroke/network/JavaConnection.java237
1 files changed, 175 insertions, 62 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java
index 7814f71..efbf54f 100644
--- a/src/com/isode/stroke/network/JavaConnection.java
+++ b/src/com/isode/stroke/network/JavaConnection.java
@@ -3,7 +3,7 @@
* All rights reserved.
*/
/*
- * Copyright (c) 2010-2012, Isode Limited, London, England.
+ * Copyright (c) 2010-2013, Isode Limited, London, England.
* All rights reserved.
*/
package com.isode.stroke.network;
@@ -12,6 +12,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,6 +34,10 @@ public class JavaConnection extends Connection implements EventOwner {
public Worker(HostAddressPort address) {
address_ = address;
}
+
+ private boolean isWriteNeeded() {
+ return (!writeBuffer_.isEmpty());
+ }
public void run() {
try {
@@ -42,79 +48,87 @@ public class JavaConnection extends Connection implements EventOwner {
* isn't what we want
*/
socketChannel_.configureBlocking(false);
+ selector_ = Selector.open();
+ selectionKey_ = socketChannel_.register(selector_, SelectionKey.OP_READ);
} catch (IOException ex) {
handleConnected(true);
return;
}
handleConnected(false);
while (!disconnecting_) {
- while (!writeBuffer_.isEmpty()) {
- ByteArray data = writeBuffer_.get(0);
- ByteBuffer byteBuffer = ByteBuffer.wrap(data.getData());
- try {
- /* Because the SocketChannel is non-blocking, we have to
- * be prepared to cope with the write operation not
- * consuming all of the data
- */
- boolean finishedWriting = false;
- while (!finishedWriting && !disconnecting_) {
- socketChannel_.write(byteBuffer);
- finishedWriting = (byteBuffer.remaining() == 0);
- if (!finishedWriting) {
- try {
- /* Give the output buffer a chance to empty */
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- /* Perhaps someone has set disconnecting_ */
- }
- }
- }
- } catch (IOException ex) {
- disconnecting_ = true;
- handleDisconnected(Error.WriteError);
- }
- writeBuffer_.remove(0);
- }
- ByteArray data = new ByteArray();
+ /* This will block until something is ready on the selector,
+ * including someone calling selector.wakeup(), or until the
+ * thread is interrupted
+ */
try {
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-
- int count = socketChannel_.read(byteBuffer);
- while (count > 0) {
- byteBuffer.flip();
- byte[] result = new byte[byteBuffer.remaining()];
- byteBuffer.get(result);
- byteBuffer.compact();
- for (int i=0; i<result.length; i++) {
- data.append(result[i]);
+ selector_.select();
+ } catch (IOException e) {
+ disconnected_ = true;
+ handleDisconnected(null);
+ break;
+ }
+
+ /* Something(s) happened. See what needs doing */
+ if (disconnecting_) {
+ handleDisconnected(null);
+ /* No point doing anything else */
+ break;
+ }
+ boolean writeNeeded = isWriteNeeded();
+ boolean readNeeded = selectionKey_.isReadable();
+
+ { /* Handle any writing */
+ if (writeNeeded) {
+ try {
+ doWrite();
+ }
+ catch (IOException e) {
+ disconnecting_ = true;
+ handleDisconnected(Error.WriteError);
}
- count = socketChannel_.read(byteBuffer);
- }
- if (count == -1) {
- /* socketChannel input has reached "end-of-stream", which
- * we regard as meaning that the socket has been closed
- */
- throw new IOException("socketChannel_.read returned -1");
}
- } catch (IOException ex) {
- handleDisconnected(Error.ReadError);
- return;
}
- if (!data.isEmpty()) {
- handleDataRead(data);
- }
+ { /* Handle any reading */
+ ByteArray dataRead;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- /* We've been woken up, probably to force us to do something.*/
+ if (readNeeded) {
+ try {
+ dataRead = doRead();
+ if (!dataRead.isEmpty()) {
+ handleDataRead(dataRead);
+ }
+ } catch (IOException ex) {
+ handleDisconnected(Error.ReadError);
+ return;
+ }
+ }
+ }
+
+ if (isWriteNeeded() && !disconnected_) {
+ /* There is something that's not been written yet.
+ * This might happen because the "doWrite()" didn't
+ * write the complete buffer, or because our "write()"
+ * method was called (perhaps more than once) since
+ * this thread was woken.
+ *
+ * Give the buffer a chance to empty
+ */
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ /* */
+ }
+ /* Force the next iteration of the loop to wake up
+ * straight away, and check all conditions again
+ */
+ selector_.wakeup();
}
}
handleDisconnected(null);
- }finally {
+ } finally {
if(socketChannel_ != null) {
try {
socketChannel_.close();
@@ -125,10 +139,102 @@ public class JavaConnection extends Connection implements EventOwner {
}
}
+ /**
+ * Called when there's something in the writeBuffer to be written.
+ * Will remove from writeBuffer_ anything that got written.
+ * @throws IOException if an error occurs when trying to write to the
+ * socket
+ */
+ private void doWrite() throws IOException {
+ if (!isWriteNeeded()) {
+ return;
+ }
+
+ ByteArray data = writeBuffer_.get(0);
+ byte[] bytes = data.getData();
+ int bytesToWrite = bytes.length;
+
+ if (bytesToWrite == 0) {
+ /*
+ * Not sure if this can happen, but does no harm to check
+ */
+ writeBuffer_.remove(0);
+ return;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+ /*
+ * Because the SocketChannel is non-blocking, we have to
+ * be prepared to cope with the write operation not
+ * consuming all (or any!) of the data
+ */
+ boolean finishedWriting = false;
+ int bytesWritten = socketChannel_.write(byteBuffer);
+ finishedWriting = (byteBuffer.remaining() == 0);
+ if (finishedWriting) {
+ writeBuffer_.remove(0);
+ return;
+ }
+ /* Was anything written at all? */
+ if (bytesWritten == 0) {
+ /* Leave the buffer in the array so that it'll get tried
+ * again later
+ */
+ return;
+ }
+
+ /* The buffer was *partly* written. This means we have to
+ * remove that part. We do this by creating a new ByteArray
+ * with the remaining bytes in, and replacing the first
+ * element in the list with that.
+ */
+ byte[] remainingBytes = new byte[bytesToWrite - bytesWritten];
+ System.arraycopy(bytes, bytesWritten,remainingBytes,0, remainingBytes.length);
+ ByteArray leftOver = new ByteArray(remainingBytes);
+
+ writeBuffer_.set(0, leftOver);
+ return;
+ }
+
+ /**
+ * Called when there's something that's come in on the socket
+ * @return a ByteBuffer containing bytes read (may be empty, won't be null)
+ * @throws IOException if the socket got closed
+ */
+ private ByteArray doRead() throws IOException {
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ ByteArray data = new ByteArray();
+
+ int count = socketChannel_.read(byteBuffer);
+ if (count == 0) {
+ return data;
+ }
+ while (count > 0) {
+ byteBuffer.flip();
+ byte[] result = new byte[byteBuffer.remaining()];
+ byteBuffer.get(result);
+ byteBuffer.compact();
+ for (int i=0; i<result.length; i++) {
+ data.append(result[i]);
+ }
+
+ count = socketChannel_.read(byteBuffer);
+ }
+ if (count == -1) {
+ /* socketChannel input has reached "end-of-stream", which
+ * we regard as meaning that the socket has been closed
+ */
+ throw new IOException("socketChannel_.read returned -1");
+ }
+ return data;
+ }
+
private void handleConnected(final boolean error) {
+
eventLoop_.postEvent(new Callback() {
public void run() {
- onConnectFinished.emit(error);
+ onConnectFinished.emit(Boolean.valueOf(error));
}
});
}
@@ -152,9 +258,6 @@ public class JavaConnection extends Connection implements EventOwner {
});
}
- public void write(ByteArray data) {
- writeBuffer_.add(data);
- }
}
private JavaConnection(EventLoop eventLoop) {
@@ -176,17 +279,25 @@ public class JavaConnection extends Connection implements EventOwner {
worker_ = new Worker(address);
Thread workerThread = new Thread(worker_);
workerThread.setDaemon(true);
+ workerThread.setName("JavaConnection "+ address.toString());
workerThread.start();
}
@Override
public void disconnect() {
disconnecting_ = true;
+ if (selector_ != null) {
+ selector_.wakeup();
+ }
}
@Override
public void write(ByteArray data) {
worker_.writeBuffer_.add(data);
+ if (selector_ != null) {
+ selector_.wakeup();
+ }
+
}
@Override
@@ -213,6 +324,8 @@ public class JavaConnection extends Connection implements EventOwner {
private boolean disconnecting_ = false;
private boolean disconnected_ = false;
private SocketChannel socketChannel_;
+ private Selector selector_;
+ private SelectionKey selectionKey_;
private Worker worker_;
}