diff options
-rw-r--r-- | src/com/isode/stroke/network/JavaConnection.java | 274 |
1 files changed, 179 insertions, 95 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java index db748e8..3dfcc05 100644 --- a/src/com/isode/stroke/network/JavaConnection.java +++ b/src/com/isode/stroke/network/JavaConnection.java @@ -9,6 +9,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -17,7 +19,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import com.isode.stroke.base.ByteArray; import com.isode.stroke.base.SafeByteArray; import com.isode.stroke.eventloop.Event.Callback; import com.isode.stroke.eventloop.EventLoop; @@ -49,7 +50,8 @@ public class JavaConnection extends Connection implements EventOwner { private final HostAddressPort address_; private final List<byte[]> writeBuffer_ = Collections.synchronizedList(new ArrayList<byte[]>()); - private volatile SelectionKey selectionKey_; + private SelectionKey selectionKey_; // not volatile - only set/tested with selectorLock_ held + private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere private boolean disconnected_ = false; public Worker(HostAddressPort address) { @@ -64,97 +66,138 @@ public class JavaConnection extends Connection implements EventOwner { try { try { socketChannel_ = SocketChannel.open( - new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort())); + new InetSocketAddress(address_.getAddress().getInetAddress(),address_.getPort())); /* By default, SocketChannels start off in blocking mode, which * isn't what we want */ socketChannel_.configureBlocking(false); - selector_ = Selector.open(); - selectionKey_ = socketChannel_.register(selector_, SelectionKey.OP_READ); - } catch (IOException ex) { + synchronized (selectorLock_) { + int ops = SelectionKey.OP_READ; + if (isWriteNeeded()) { + ops |= SelectionKey.OP_WRITE; // could have been queued before selectionKey_ established + } + selectionKey_ = socketChannel_.register(Selector.open(), ops); + } + } catch (IOException ex) { // includes ClosedChannelException handleConnected(true); return; } handleConnected(false); + final SelectionKey selectionKey = selectionKey_; + final Selector selector = selectionKey.selector(); while (!disconnecting_ || isWriteNeeded()) { - /* Something(s) happened. See what needs doing */ - boolean writeNeeded = isWriteNeeded(); - boolean readNeeded = (selectionKey_.interestOps() & SelectionKey.OP_READ) != 0 - && selectionKey_.isReadable(); + int ready = 0; - { /* Handle any writing */ - if (writeNeeded) { - try { - doWrite(); - } - catch (IOException e) { - disconnecting_ = true; - handleDisconnected(Error.WriteError); - } - } + /* This will block until something is ready on the selector, + * including someone calling selector.wakeup(), or until the + * thread is interrupted + */ + try { + /* + * This should actually be thread safe, despite first appearances. + * Selector.select() could have examined the (empty) bit mask + * but the mask could be updated and Selector.wakeup() called + * before the action OS select starts. However, Selector.wakeup() + * will cause the next Selector.select() to return immediately + * in any case. A memory barrier is required somewhere in the loop. + * + * The Android implementation (java.nio.SelectorImpl) appears to honour + * the somewhat poorly worded contract of Selector.wakeup(). There is + * no window between testing the ops and starting the underlying select + * as it uses a loopback connection (pipe) to communicate the wakeup event. + * The same is true of the Solaris implementations (PollSelectorImpl + * & DevPollSelectorImpl). + * + * It is probably reasonable to assume that OpenJDK-based implementations + * are thread-safe. + * + * GCC 4.8 libjava has a thread-unsafe window (EpollSelectorImpl). + * + * + * Another option would be to wait on selectorLock_ while selectionKey's + * interestOps is empty (and not disconnecting). Since bits are only ever + * removed from the set by this thread (even though they are added by others) + * then such a strategy would remove any chance of selecting with an empty mask, + * and the consequent issue of a potentially (buggy)thread-unsafe implementation + * of Selector.wakeup(). This would complicate the inter-thread communication, + * requiring signalling the lock as well as calling Selector.wakeup(). It is + * also possible that a weird read-before-write requirement when a select only + * for write is active would still not be interrupted, so it would not solve + * everything. + */ + ready = selector.select(); + } catch (ClosedSelectorException e) { + break; + } catch (IOException e) { + break; } - - { /* Handle any reading */ - SafeByteArray dataRead; - - if (readNeeded) { - ReadResult rr = doRead(); - dataRead = rr.dataRead_; - if (!dataRead.isEmpty()) { - handleDataRead(dataRead); - } - if (rr.socketClosed_) { - handleDisconnected(Error.ReadError); - return; + + /* See what needs doing */ + boolean writeNeeded = false; + boolean readNeeded = false; + /* + * This synchronized block serves two purposes: + * 1. Guard against the small chance that selectionKey.interestOps() + * may be thread-unsafe relative to setInterestOp() below. + * 2. Provide a memory barrier to ensure that updated select ops are + * reflected in this thread's view of the world. + */ + synchronized (selectorLock_) { + try { + if (ready > 0) { + final int ops = selectionKey.interestOps() & selectionKey.readyOps(); + writeNeeded = (ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE; + readNeeded = (ops & SelectionKey.OP_READ) == SelectionKey.OP_READ; + selector.selectedKeys().clear(); } + } catch (CancelledKeyException e) { + // leave it to select to catch the disconnect } } - if (isWriteNeeded() && !disconnected_) { - /* There is something that's not been written yet. - * This might happen because the "doWrite()" didn't - * write the complete buffer, or because our "write()" - * method was called (perhaps more than once) since - * this thread was woken. - * - * Give the buffer a chance to empty - */ + /* Handle any writing */ + if (writeNeeded) { try { - Thread.sleep(100); + doWrite(); + synchronized (selectorLock_) { + if (!isWriteNeeded()) { + clearInterestOp(SelectionKey.OP_WRITE); + } + } } - catch (InterruptedException e) { - /* */ + catch (IOException e) { + disconnecting_ = true; + handleDisconnected(Error.WriteError); } - /* Force the next iteration of the loop to wake up - * straight away, and check all conditions again - */ - selector_.wakeup(); } - /* This will block until something is ready on the selector, - * including someone calling selector.wakeup(), or until the - * thread is interrupted - */ - try { - selector_.select(); - } catch (IOException e) { - disconnected_ = true; - break; + /* Handle any reading */ + if (readNeeded) { + final ReadResult rr = doRead(); + final SafeByteArray dataRead = rr.dataRead_; + if (!dataRead.isEmpty()) { + handleDataRead(dataRead); + } + if (rr.socketClosed_) { + handleDisconnected(Error.ReadError); + return; + } } + } handleDisconnected(null); } finally { if(socketChannel_ != null) { try { - socketChannel_.close(); + socketChannel_.close(); } catch (IOException ex) { /* Do we need to return an error if we're already trying to close? */ } - if(selector_ != null) { + if(selectionKey_ != null) { try { synchronized (selectorLock_) { - selector_.close(); + selectionKey_.selector().close(); } } catch (IOException e) { } @@ -162,6 +205,74 @@ public class JavaConnection extends Connection implements EventOwner { } } } + + /** + * Set one or more SelectionKey bits in the select mask. + * + * May be called from outside Worker thread + * May recursively lock selectorLock_ + * + * @param op - OP_READ | OP_WRITE: may be 0 just to force wakeup + */ + private void setInterestOp(int op) { + synchronized (selectorLock_) { + final SelectionKey key = selectionKey_; + if (key != null && key.isValid()) { + try { + key.interestOps(key.interestOps() | op); + + /* + * We could check that we have actually changed the mask before + * invoking a wakeup. The usage pattern, however, is such that + * such a check would almost always be true. + */ + final Selector selector = key.selector(); + // Check "isOpen" to avoid Android crash see + // https://code.google.com/p/android/issues/detail?id=80785 + if (selector.isOpen()) { + selector.wakeup(); + } + } catch (CancelledKeyException e) { + // channel has been closed + } + } + } + } + + /** + * Clear one or more SelectionKey bits in the select mask. + * + * May be called from outside Worker thread + * May recursively lock selectorLock_ + * + * @param op - OP_READ | OP_WRITE + */ + private void clearInterestOp(int op) { + synchronized (selectorLock_) { + final SelectionKey key = selectionKey_; + if (key != null && key.isValid()) { + try { + key.interestOps(key.interestOps() & ~op); + // No need to wakeup the selector + } catch (CancelledKeyException e) { + // channel has been closed + } + } + } + } + + /** + * Called from outside Worker thread + */ + public void queueWrite(byte[] data) { + synchronized (selectorLock_) { + writeBuffer_.add(data); + if (writeBuffer_.size() == 1) { + setInterestOp(SelectionKey.OP_WRITE); + } + } + } + /** * Called when there's something in the writeBuffer to be written. @@ -302,22 +413,13 @@ public class JavaConnection extends Connection implements EventOwner { private void handleDataRead(final SafeByteArray data) { if (synchroniseReads_) { - selectionKey_.interestOps(0); + clearInterestOp(SelectionKey.OP_READ); } eventLoop_.postEvent(new Callback() { public void run() { onDataRead.emit(data); - // Check "isOpen" to Avoid Android crash see - // https://code.google.com/p/android/issues/detail?id=80785 - if (synchroniseReads_ && selector_ != null) { - synchronized (selectorLock_) { - if (selectionKey_.isValid()) { - selectionKey_.interestOps(SelectionKey.OP_READ); - } - if (selector_.isOpen()) { - selector_.wakeup(); - } - } + if (synchroniseReads_) { + setInterestOp(SelectionKey.OP_READ); } } }); @@ -363,30 +465,14 @@ public class JavaConnection extends Connection implements EventOwner { @Override public void disconnect() { disconnecting_ = true; - // Check "isOpen" to Avoid Android crash see - // https://code.google.com/p/android/issues/detail?id=80785 - if (selector_ != null) { - synchronized (selectorLock_) { - if (selector_.isOpen()) { - selector_.wakeup(); - } - } + if (worker_ != null) { + worker_.setInterestOp(0); // force wakeup } } @Override public void write(SafeByteArray data) { - worker_.writeBuffer_.add(data.getData()); - // Check "isOpen" to Avoid Android crash see - // https://code.google.com/p/android/issues/detail?id=80785 - if (selector_ != null) { - synchronized (selectorLock_) { - if (selector_.isOpen()) { - selector_.wakeup(); - } - } - } - + worker_.queueWrite(data.getData()); } @Override @@ -410,10 +496,8 @@ public class JavaConnection extends Connection implements EventOwner { } private final EventLoop eventLoop_; - private boolean disconnecting_ = false; + private volatile boolean disconnecting_ = false; private SocketChannel socketChannel_; - private Selector selector_; - private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere private Worker worker_; private final boolean synchroniseReads_; |