summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlan Young <consult.awy@gmail.com>2015-09-07 14:33:02 (GMT)
committerAlan Young <consult.awy@gmail.com>2015-09-11 06:02:21 (GMT)
commita95753ca942dd6bfe89a9ce6287909a69bf93e96 (patch)
tree3a8ef009d37c8d98e7d3a654df7b2f791384b61f
parent57c4ee4ee04e42662fe829244dbdeb747cb22162 (diff)
downloadstroke-a95753ca942dd6bfe89a9ce6287909a69bf93e96.zip
stroke-a95753ca942dd6bfe89a9ce6287909a69bf93e96.tar.bz2
Make JavaConnection fully event driven - no 100ms spin
Refactor JavaConnection to make it fully event-driven via select for both reads and writes. Remove the 100ms spin when there are pending writes. Encapslate all the management of the select mask and the write queue within Worker and protect via a common lock. Change-Id: I1a709f9b12a949448923f51c2d434746c9190c9d
-rw-r--r--src/com/isode/stroke/network/JavaConnection.java274
1 files changed, 179 insertions, 95 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java
index db748e8..3dfcc05 100644
--- a/src/com/isode/stroke/network/JavaConnection.java
+++ b/src/com/isode/stroke/network/JavaConnection.java
@@ -9,6 +9,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
@@ -17,7 +19,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import com.isode.stroke.base.ByteArray;
import com.isode.stroke.base.SafeByteArray;
import com.isode.stroke.eventloop.Event.Callback;
import com.isode.stroke.eventloop.EventLoop;
@@ -49,7 +50,8 @@ public class JavaConnection extends Connection implements EventOwner {
private final HostAddressPort address_;
private final List<byte[]> writeBuffer_ = Collections.synchronizedList(new ArrayList<byte[]>());
- private volatile SelectionKey selectionKey_;
+ private SelectionKey selectionKey_; // not volatile - only set/tested with selectorLock_ held
+ private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere
private boolean disconnected_ = false;
public Worker(HostAddressPort address) {
@@ -64,97 +66,138 @@ public class JavaConnection extends Connection implements EventOwner {
try {
try {
socketChannel_ = SocketChannel.open(
- new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort()));
+ new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort()));
/* By default, SocketChannels start off in blocking mode, which
* isn't what we want
*/
socketChannel_.configureBlocking(false);
- selector_ = Selector.open();
- selectionKey_ = socketChannel_.register(selector_, SelectionKey.OP_READ);
- } catch (IOException ex) {
+ synchronized (selectorLock_) {
+ int ops = SelectionKey.OP_READ;
+ if (isWriteNeeded()) {
+ ops |= SelectionKey.OP_WRITE; // could have been queued before selectionKey_ established
+ }
+ selectionKey_ = socketChannel_.register(Selector.open(), ops);
+ }
+ } catch (IOException ex) { // includes ClosedChannelException
handleConnected(true);
return;
}
handleConnected(false);
+ final SelectionKey selectionKey = selectionKey_;
+ final Selector selector = selectionKey.selector();
while (!disconnecting_ || isWriteNeeded()) {
- /* Something(s) happened. See what needs doing */
- boolean writeNeeded = isWriteNeeded();
- boolean readNeeded = (selectionKey_.interestOps() & SelectionKey.OP_READ) != 0
- && selectionKey_.isReadable();
+ int ready = 0;
- { /* Handle any writing */
- if (writeNeeded) {
- try {
- doWrite();
- }
- catch (IOException e) {
- disconnecting_ = true;
- handleDisconnected(Error.WriteError);
- }
- }
+ /* This will block until something is ready on the selector,
+ * including someone calling selector.wakeup(), or until the
+ * thread is interrupted
+ */
+ try {
+ /*
+ * This should actually be thread safe, despite first appearances.
+ * Selector.select() could have examined the (empty) bit mask
+ * but the mask could be updated and Selector.wakeup() called
+ * before the action OS select starts. However, Selector.wakeup()
+ * will cause the next Selector.select() to return immediately
+ * in any case. A memory barrier is required somewhere in the loop.
+ *
+ * The Android implementation (java.nio.SelectorImpl) appears to honour
+ * the somewhat poorly worded contract of Selector.wakeup(). There is
+ * no window between testing the ops and starting the underlying select
+ * as it uses a loopback connection (pipe) to communicate the wakeup event.
+ * The same is true of the Solaris implementations (PollSelectorImpl
+ * & DevPollSelectorImpl).
+ *
+ * It is probably reasonable to assume that OpenJDK-based implementations
+ * are thread-safe.
+ *
+ * GCC 4.8 libjava has a thread-unsafe window (EpollSelectorImpl).
+ *
+ *
+ * Another option would be to wait on selectorLock_ while selectionKey's
+ * interestOps is empty (and not disconnecting). Since bits are only ever
+ * removed from the set by this thread (even though they are added by others)
+ * then such a strategy would remove any chance of selecting with an empty mask,
+ * and the consequent issue of a potentially (buggy)thread-unsafe implementation
+ * of Selector.wakeup(). This would complicate the inter-thread communication,
+ * requiring signalling the lock as well as calling Selector.wakeup(). It is
+ * also possible that a weird read-before-write requirement when a select only
+ * for write is active would still not be interrupted, so it would not solve
+ * everything.
+ */
+ ready = selector.select();
+ } catch (ClosedSelectorException e) {
+ break;
+ } catch (IOException e) {
+ break;
}
-
- { /* Handle any reading */
- SafeByteArray dataRead;
-
- if (readNeeded) {
- ReadResult rr = doRead();
- dataRead = rr.dataRead_;
- if (!dataRead.isEmpty()) {
- handleDataRead(dataRead);
- }
- if (rr.socketClosed_) {
- handleDisconnected(Error.ReadError);
- return;
+
+ /* See what needs doing */
+ boolean writeNeeded = false;
+ boolean readNeeded = false;
+ /*
+ * This synchronized block serves two purposes:
+ * 1. Guard against the small chance that selectionKey.interestOps()
+ * may be thread-unsafe relative to setInterestOp() below.
+ * 2. Provide a memory barrier to ensure that updated select ops are
+ * reflected in this thread's view of the world.
+ */
+ synchronized (selectorLock_) {
+ try {
+ if (ready > 0) {
+ final int ops = selectionKey.interestOps() & selectionKey.readyOps();
+ writeNeeded = (ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
+ readNeeded = (ops & SelectionKey.OP_READ) == SelectionKey.OP_READ;
+ selector.selectedKeys().clear();
}
+ } catch (CancelledKeyException e) {
+ // leave it to select to catch the disconnect
}
}
- if (isWriteNeeded() && !disconnected_) {
- /* There is something that's not been written yet.
- * This might happen because the "doWrite()" didn't
- * write the complete buffer, or because our "write()"
- * method was called (perhaps more than once) since
- * this thread was woken.
- *
- * Give the buffer a chance to empty
- */
+ /* Handle any writing */
+ if (writeNeeded) {
try {
- Thread.sleep(100);
+ doWrite();
+ synchronized (selectorLock_) {
+ if (!isWriteNeeded()) {
+ clearInterestOp(SelectionKey.OP_WRITE);
+ }
+ }
}
- catch (InterruptedException e) {
- /* */
+ catch (IOException e) {
+ disconnecting_ = true;
+ handleDisconnected(Error.WriteError);
}
- /* Force the next iteration of the loop to wake up
- * straight away, and check all conditions again
- */
- selector_.wakeup();
}
- /* This will block until something is ready on the selector,
- * including someone calling selector.wakeup(), or until the
- * thread is interrupted
- */
- try {
- selector_.select();
- } catch (IOException e) {
- disconnected_ = true;
- break;
+ /* Handle any reading */
+ if (readNeeded) {
+ final ReadResult rr = doRead();
+ final SafeByteArray dataRead = rr.dataRead_;
+ if (!dataRead.isEmpty()) {
+ handleDataRead(dataRead);
+ }
+ if (rr.socketClosed_) {
+ handleDisconnected(Error.ReadError);
+ return;
+ }
}
+
}
handleDisconnected(null);
} finally {
if(socketChannel_ != null) {
try {
- socketChannel_.close();
+ socketChannel_.close();
} catch (IOException ex) {
/* Do we need to return an error if we're already trying to close? */
}
- if(selector_ != null) {
+ if(selectionKey_ != null) {
try {
synchronized (selectorLock_) {
- selector_.close();
+ selectionKey_.selector().close();
}
} catch (IOException e) {
}
@@ -162,6 +205,74 @@ public class JavaConnection extends Connection implements EventOwner {
}
}
}
+
+ /**
+ * Set one or more SelectionKey bits in the select mask.
+ *
+ * May be called from outside Worker thread
+ * May recursively lock selectorLock_
+ *
+ * @param op - OP_READ | OP_WRITE: may be 0 just to force wakeup
+ */
+ private void setInterestOp(int op) {
+ synchronized (selectorLock_) {
+ final SelectionKey key = selectionKey_;
+ if (key != null && key.isValid()) {
+ try {
+ key.interestOps(key.interestOps() | op);
+
+ /*
+ * We could check that we have actually changed the mask before
+ * invoking a wakeup. The usage pattern, however, is such that
+ * such a check would almost always be true.
+ */
+ final Selector selector = key.selector();
+ // Check "isOpen" to avoid Android crash see
+ // https://code.google.com/p/android/issues/detail?id=80785
+ if (selector.isOpen()) {
+ selector.wakeup();
+ }
+ } catch (CancelledKeyException e) {
+ // channel has been closed
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear one or more SelectionKey bits in the select mask.
+ *
+ * May be called from outside Worker thread
+ * May recursively lock selectorLock_
+ *
+ * @param op - OP_READ | OP_WRITE
+ */
+ private void clearInterestOp(int op) {
+ synchronized (selectorLock_) {
+ final SelectionKey key = selectionKey_;
+ if (key != null && key.isValid()) {
+ try {
+ key.interestOps(key.interestOps() & ~op);
+ // No need to wakeup the selector
+ } catch (CancelledKeyException e) {
+ // channel has been closed
+ }
+ }
+ }
+ }
+
+ /**
+ * Called from outside Worker thread
+ */
+ public void queueWrite(byte[] data) {
+ synchronized (selectorLock_) {
+ writeBuffer_.add(data);
+ if (writeBuffer_.size() == 1) {
+ setInterestOp(SelectionKey.OP_WRITE);
+ }
+ }
+ }
+
/**
* Called when there's something in the writeBuffer to be written.
@@ -302,22 +413,13 @@ public class JavaConnection extends Connection implements EventOwner {
private void handleDataRead(final SafeByteArray data) {
if (synchroniseReads_) {
- selectionKey_.interestOps(0);
+ clearInterestOp(SelectionKey.OP_READ);
}
eventLoop_.postEvent(new Callback() {
public void run() {
onDataRead.emit(data);
- // Check "isOpen" to Avoid Android crash see
- // https://code.google.com/p/android/issues/detail?id=80785
- if (synchroniseReads_ && selector_ != null) {
- synchronized (selectorLock_) {
- if (selectionKey_.isValid()) {
- selectionKey_.interestOps(SelectionKey.OP_READ);
- }
- if (selector_.isOpen()) {
- selector_.wakeup();
- }
- }
+ if (synchroniseReads_) {
+ setInterestOp(SelectionKey.OP_READ);
}
}
});
@@ -363,30 +465,14 @@ public class JavaConnection extends Connection implements EventOwner {
@Override
public void disconnect() {
disconnecting_ = true;
- // Check "isOpen" to Avoid Android crash see
- // https://code.google.com/p/android/issues/detail?id=80785
- if (selector_ != null) {
- synchronized (selectorLock_) {
- if (selector_.isOpen()) {
- selector_.wakeup();
- }
- }
+ if (worker_ != null) {
+ worker_.setInterestOp(0); // force wakeup
}
}
@Override
public void write(SafeByteArray data) {
- worker_.writeBuffer_.add(data.getData());
- // Check "isOpen" to Avoid Android crash see
- // https://code.google.com/p/android/issues/detail?id=80785
- if (selector_ != null) {
- synchronized (selectorLock_) {
- if (selector_.isOpen()) {
- selector_.wakeup();
- }
- }
- }
-
+ worker_.queueWrite(data.getData());
}
@Override
@@ -410,10 +496,8 @@ public class JavaConnection extends Connection implements EventOwner {
}
private final EventLoop eventLoop_;
- private boolean disconnecting_ = false;
+ private volatile boolean disconnecting_ = false;
private SocketChannel socketChannel_;
- private Selector selector_;
- private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere
private Worker worker_;
private final boolean synchroniseReads_;