summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/com/isode/stroke/network/JavaConnection.java274
1 files changed, 179 insertions, 95 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java
index db748e8..3dfcc05 100644
--- a/src/com/isode/stroke/network/JavaConnection.java
+++ b/src/com/isode/stroke/network/JavaConnection.java
@@ -9,6 +9,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
@@ -17,7 +19,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import com.isode.stroke.base.ByteArray;
import com.isode.stroke.base.SafeByteArray;
import com.isode.stroke.eventloop.Event.Callback;
import com.isode.stroke.eventloop.EventLoop;
@@ -49,7 +50,8 @@ public class JavaConnection extends Connection implements EventOwner {
private final HostAddressPort address_;
private final List<byte[]> writeBuffer_ = Collections.synchronizedList(new ArrayList<byte[]>());
- private volatile SelectionKey selectionKey_;
+ private SelectionKey selectionKey_; // not volatile - only set/tested with selectorLock_ held
+ private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere
private boolean disconnected_ = false;
public Worker(HostAddressPort address) {
@@ -64,97 +66,138 @@ public class JavaConnection extends Connection implements EventOwner {
try {
try {
socketChannel_ = SocketChannel.open(
- new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort()));
+ new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort()));
/* By default, SocketChannels start off in blocking mode, which
* isn't what we want
*/
socketChannel_.configureBlocking(false);
- selector_ = Selector.open();
- selectionKey_ = socketChannel_.register(selector_, SelectionKey.OP_READ);
- } catch (IOException ex) {
+ synchronized (selectorLock_) {
+ int ops = SelectionKey.OP_READ;
+ if (isWriteNeeded()) {
+ ops |= SelectionKey.OP_WRITE; // could have been queued before selectionKey_ established
+ }
+ selectionKey_ = socketChannel_.register(Selector.open(), ops);
+ }
+ } catch (IOException ex) { // includes ClosedChannelException
handleConnected(true);
return;
}
handleConnected(false);
+ final SelectionKey selectionKey = selectionKey_;
+ final Selector selector = selectionKey.selector();
while (!disconnecting_ || isWriteNeeded()) {
- /* Something(s) happened. See what needs doing */
- boolean writeNeeded = isWriteNeeded();
- boolean readNeeded = (selectionKey_.interestOps() & SelectionKey.OP_READ) != 0
- && selectionKey_.isReadable();
+ int ready = 0;
- { /* Handle any writing */
- if (writeNeeded) {
- try {
- doWrite();
- }
- catch (IOException e) {
- disconnecting_ = true;
- handleDisconnected(Error.WriteError);
- }
- }
+ /* This will block until something is ready on the selector,
+ * including someone calling selector.wakeup(), or until the
+ * thread is interrupted
+ */
+ try {
+ /*
+ * This should actually be thread safe, despite first appearances.
+ * Selector.select() could have examined the (empty) bit mask
+ * but the mask could be updated and Selector.wakeup() called
+ * before the action OS select starts. However, Selector.wakeup()
+ * will cause the next Selector.select() to return immediately
+ * in any case. A memory barrier is required somewhere in the loop.
+ *
+ * The Android implementation (java.nio.SelectorImpl) appears to honour
+ * the somewhat poorly worded contract of Selector.wakeup(). There is
+ * no window between testing the ops and starting the underlying select
+ * as it uses a loopback connection (pipe) to communicate the wakeup event.
+ * The same is true of the Solaris implementations (PollSelectorImpl
+ * & DevPollSelectorImpl).
+ *
+ * It is probably reasonable to assume that OpenJDK-based implementations
+ * are thread-safe.
+ *
+ * GCC 4.8 libjava has a thread-unsafe window (EpollSelectorImpl).
+ *
+ *
+ * Another option would be to wait on selectorLock_ while selectionKey's
+ * interestOps is empty (and not disconnecting). Since bits are only ever
+ * removed from the set by this thread (even though they are added by others)
+ * then such a strategy would remove any chance of selecting with an empty mask,
+ * and the consequent issue of a potentially (buggy)thread-unsafe implementation
+ * of Selector.wakeup(). This would complicate the inter-thread communication,
+ * requiring signalling the lock as well as calling Selector.wakeup(). It is
+ * also possible that a weird read-before-write requirement when a select only
+ * for write is active would still not be interrupted, so it would not solve
+ * everything.
+ */
+ ready = selector.select();
+ } catch (ClosedSelectorException e) {
+ break;
+ } catch (IOException e) {
+ break;
}
-
- { /* Handle any reading */
- SafeByteArray dataRead;
-
- if (readNeeded) {
- ReadResult rr = doRead();
- dataRead = rr.dataRead_;
- if (!dataRead.isEmpty()) {
- handleDataRead(dataRead);
- }
- if (rr.socketClosed_) {
- handleDisconnected(Error.ReadError);
- return;
+
+ /* See what needs doing */
+ boolean writeNeeded = false;
+ boolean readNeeded = false;
+ /*
+ * This synchronized block serves two purposes:
+ * 1. Guard against the small chance that selectionKey.interestOps()
+ * may be thread-unsafe relative to setInterestOp() below.
+ * 2. Provide a memory barrier to ensure that updated select ops are
+ * reflected in this thread's view of the world.
+ */
+ synchronized (selectorLock_) {
+ try {
+ if (ready > 0) {
+ final int ops = selectionKey.interestOps() & selectionKey.readyOps();
+ writeNeeded = (ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
+ readNeeded = (ops & SelectionKey.OP_READ) == SelectionKey.OP_READ;
+ selector.selectedKeys().clear();
}
+ } catch (CancelledKeyException e) {
+ // leave it to select to catch the disconnect
}
}
- if (isWriteNeeded() && !disconnected_) {
- /* There is something that's not been written yet.
- * This might happen because the "doWrite()" didn't
- * write the complete buffer, or because our "write()"
- * method was called (perhaps more than once) since
- * this thread was woken.
- *
- * Give the buffer a chance to empty
- */
+ /* Handle any writing */
+ if (writeNeeded) {
try {
- Thread.sleep(100);
+ doWrite();
+ synchronized (selectorLock_) {
+ if (!isWriteNeeded()) {
+ clearInterestOp(SelectionKey.OP_WRITE);
+ }
+ }
}
- catch (InterruptedException e) {
- /* */
+ catch (IOException e) {
+ disconnecting_ = true;
+ handleDisconnected(Error.WriteError);
}
- /* Force the next iteration of the loop to wake up
- * straight away, and check all conditions again
- */
- selector_.wakeup();
}
- /* This will block until something is ready on the selector,
- * including someone calling selector.wakeup(), or until the
- * thread is interrupted
- */
- try {
- selector_.select();
- } catch (IOException e) {
- disconnected_ = true;
- break;
+ /* Handle any reading */
+ if (readNeeded) {
+ final ReadResult rr = doRead();
+ final SafeByteArray dataRead = rr.dataRead_;
+ if (!dataRead.isEmpty()) {
+ handleDataRead(dataRead);
+ }
+ if (rr.socketClosed_) {
+ handleDisconnected(Error.ReadError);
+ return;
+ }
}
+
}
handleDisconnected(null);
} finally {
if(socketChannel_ != null) {
try {
- socketChannel_.close();
+ socketChannel_.close();
} catch (IOException ex) {
/* Do we need to return an error if we're already trying to close? */
}
- if(selector_ != null) {
+ if(selectionKey_ != null) {
try {
synchronized (selectorLock_) {
- selector_.close();
+ selectionKey_.selector().close();
}
} catch (IOException e) {
}
@@ -162,6 +205,74 @@ public class JavaConnection extends Connection implements EventOwner {
}
}
}
+
+ /**
+ * Set one or more SelectionKey bits in the select mask.
+ *
+ * May be called from outside Worker thread
+ * May recursively lock selectorLock_
+ *
+ * @param op - OP_READ | OP_WRITE: may be 0 just to force wakeup
+ */
+ private void setInterestOp(int op) {
+ synchronized (selectorLock_) {
+ final SelectionKey key = selectionKey_;
+ if (key != null && key.isValid()) {
+ try {
+ key.interestOps(key.interestOps() | op);
+
+ /*
+ * We could check that we have actually changed the mask before
+ * invoking a wakeup. The usage pattern, however, is such that
+ * such a check would almost always be true.
+ */
+ final Selector selector = key.selector();
+ // Check "isOpen" to avoid Android crash see
+ // https://code.google.com/p/android/issues/detail?id=80785
+ if (selector.isOpen()) {
+ selector.wakeup();
+ }
+ } catch (CancelledKeyException e) {
+ // channel has been closed
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear one or more SelectionKey bits in the select mask.
+ *
+ * May be called from outside Worker thread
+ * May recursively lock selectorLock_
+ *
+ * @param op - OP_READ | OP_WRITE
+ */
+ private void clearInterestOp(int op) {
+ synchronized (selectorLock_) {
+ final SelectionKey key = selectionKey_;
+ if (key != null && key.isValid()) {
+ try {
+ key.interestOps(key.interestOps() & ~op);
+ // No need to wakeup the selector
+ } catch (CancelledKeyException e) {
+ // channel has been closed
+ }
+ }
+ }
+ }
+
+ /**
+ * Called from outside Worker thread
+ */
+ public void queueWrite(byte[] data) {
+ synchronized (selectorLock_) {
+ writeBuffer_.add(data);
+ if (writeBuffer_.size() == 1) {
+ setInterestOp(SelectionKey.OP_WRITE);
+ }
+ }
+ }
+
/**
* Called when there's something in the writeBuffer to be written.
@@ -302,22 +413,13 @@ public class JavaConnection extends Connection implements EventOwner {
private void handleDataRead(final SafeByteArray data) {
if (synchroniseReads_) {
- selectionKey_.interestOps(0);
+ clearInterestOp(SelectionKey.OP_READ);
}
eventLoop_.postEvent(new Callback() {
public void run() {
onDataRead.emit(data);
- // Check "isOpen" to Avoid Android crash see
- // https://code.google.com/p/android/issues/detail?id=80785
- if (synchroniseReads_ && selector_ != null) {
- synchronized (selectorLock_) {
- if (selectionKey_.isValid()) {
- selectionKey_.interestOps(SelectionKey.OP_READ);
- }
- if (selector_.isOpen()) {
- selector_.wakeup();
- }
- }
+ if (synchroniseReads_) {
+ setInterestOp(SelectionKey.OP_READ);
}
}
});
@@ -363,30 +465,14 @@ public class JavaConnection extends Connection implements EventOwner {
@Override
public void disconnect() {
disconnecting_ = true;
- // Check "isOpen" to Avoid Android crash see
- // https://code.google.com/p/android/issues/detail?id=80785
- if (selector_ != null) {
- synchronized (selectorLock_) {
- if (selector_.isOpen()) {
- selector_.wakeup();
- }
- }
+ if (worker_ != null) {
+ worker_.setInterestOp(0); // force wakeup
}
}
@Override
public void write(SafeByteArray data) {
- worker_.writeBuffer_.add(data.getData());
- // Check "isOpen" to Avoid Android crash see
- // https://code.google.com/p/android/issues/detail?id=80785
- if (selector_ != null) {
- synchronized (selectorLock_) {
- if (selector_.isOpen()) {
- selector_.wakeup();
- }
- }
- }
-
+ worker_.queueWrite(data.getData());
}
@Override
@@ -410,10 +496,8 @@ public class JavaConnection extends Connection implements EventOwner {
}
private final EventLoop eventLoop_;
- private boolean disconnecting_ = false;
+ private volatile boolean disconnecting_ = false;
private SocketChannel socketChannel_;
- private Selector selector_;
- private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere
private Worker worker_;
private final boolean synchroniseReads_;