diff options
Diffstat (limited to 'src/com/isode')
-rw-r--r-- | src/com/isode/stroke/network/JavaConnection.java | 92 |
1 files changed, 75 insertions, 17 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java index a371ae9..29e3633 100644 --- a/src/com/isode/stroke/network/JavaConnection.java +++ b/src/com/isode/stroke/network/JavaConnection.java @@ -24,6 +24,10 @@ import com.isode.stroke.eventloop.EventLoop; import com.isode.stroke.eventloop.EventOwner; public class JavaConnection extends Connection implements EventOwner { + + public interface ActivityWatcher { + void onConnectionActive(boolean isActive); + } private class Worker implements Runnable { @@ -32,9 +36,11 @@ public class JavaConnection extends Connection implements EventOwner { private SelectionKey selectionKey_; // not volatile - only set/tested with selectorLock_ held private final Object selectorLock_ = new Object(); // use private lock object that is not visible elsewhere private boolean disconnected_ = false; + private final ActivityWatcher watcher_; - public Worker(HostAddressPort address) { + public Worker(HostAddressPort address, ActivityWatcher watcher) { address_ = address; + watcher_ = watcher; } private boolean isWriteNeeded() { @@ -61,10 +67,15 @@ public class JavaConnection extends Connection implements EventOwner { handleConnected(true); return; } + handleConnected(false); + if (watcher_ != null) { + watcher_.onConnectionActive(true); + } final SelectionKey selectionKey = selectionKey_; final Selector selector = selectionKey.selector(); + int recentlyActiveCounter = 0; while (!disconnecting_ || isWriteNeeded()) { int ready = 0; @@ -105,7 +116,20 @@ public class JavaConnection extends Connection implements EventOwner { * for write is active would still not be interrupted, so it would not solve * everything. */ - ready = selector.select(); + if (watcher_ != null) { + if (recentlyActiveCounter-- > 0) { + // Active last time around the loop + // so just sleep for up to 100ms. + // This avoids poking the watcher too frequently. + ready = selector.select(100); + } else { + watcher_.onConnectionActive(false); + ready = selector.select(); + watcher_.onConnectionActive(true); + } + } else { + ready = selector.select(); + } } catch (ClosedSelectorException e) { break; } catch (IOException e) { @@ -135,6 +159,20 @@ public class JavaConnection extends Connection implements EventOwner { } } + if (writeNeeded || readNeeded) { + /* + * Use a count of 2 to cope with closely-spaced + * events. When the first completes then the active select, + * or more-likely the not-yet-active-select, will get + * a wakeup and so nothing will be signalled as ready + * but we still do not want to signal inactive just yet + * because there could already be another chunk of data + * ready to read. Even queueing up the response could + * cause this. + */ + recentlyActiveCounter = 2; + } + /* Handle any writing */ if (writeNeeded) { try { @@ -153,7 +191,7 @@ public class JavaConnection extends Connection implements EventOwner { /* Handle any reading */ if (readNeeded) { - final SafeByteArray data = new SafeByteArray(); + final SafeByteArray data = new SafeByteArray(); final boolean closed = doRead(data); if (!data.isEmpty()) { handleDataRead(data); @@ -167,20 +205,26 @@ public class JavaConnection extends Connection implements EventOwner { } handleDisconnected(null); } finally { - if(socketChannel_ != null) { - try { - socketChannel_.close(); - } catch (IOException ex) { - /* Do we need to return an error if we're already trying to close? */ - } - if(selectionKey_ != null) { + try { + if(socketChannel_ != null) { try { - synchronized (selectorLock_) { - selectionKey_.selector().close(); + socketChannel_.close(); + } catch (IOException ex) { + /* Do we need to return an error if we're already trying to close? */ + } + if(selectionKey_ != null) { + try { + synchronized (selectorLock_) { + selectionKey_.selector().close(); + } + } catch (IOException e) { } - } catch (IOException e) { } } + } finally { + if (watcher_ != null) { + watcher_.onConnectionActive(false); + } } } } @@ -397,13 +441,14 @@ public class JavaConnection extends Connection implements EventOwner { } - private JavaConnection(EventLoop eventLoop, boolean synchroniseReads) { + private JavaConnection(EventLoop eventLoop, boolean synchroniseReads, ActivityWatcher watcher) { eventLoop_ = eventLoop; synchroniseReads_ = synchroniseReads; + watcher_ = watcher; } public static JavaConnection create(EventLoop eventLoop) { - return new JavaConnection(eventLoop, false); + return new JavaConnection(eventLoop, false, null); } /** @@ -414,7 +459,19 @@ public class JavaConnection extends Connection implements EventOwner { * @return a new JavaConnection */ public static JavaConnection create(EventLoop eventLoop, boolean synchroniseReads) { - return new JavaConnection(eventLoop, synchroniseReads); + return new JavaConnection(eventLoop, synchroniseReads, null); + } + + /** + * Creates a new JavaConnection + * @param eventLoop the EventLoop for read and write events to be posted to + * @param synchroniseReads if true then data will not be read from the connection + * until the previous read has been processed by the EventLoop + * @param watcher handler to be signalled when connection becomes active or inactive + * @return a new JavaConnection + */ + public static JavaConnection create(EventLoop eventLoop, boolean synchroniseReads, ActivityWatcher watcher) { + return new JavaConnection(eventLoop, synchroniseReads, watcher); } @Override @@ -425,7 +482,7 @@ public class JavaConnection extends Connection implements EventOwner { @Override public void connect(HostAddressPort address) { - worker_ = new Worker(address); + worker_ = new Worker(address, watcher_); Thread workerThread = new Thread(worker_); workerThread.setDaemon(true); workerThread.setName("JavaConnection "+ address.toString()); @@ -470,5 +527,6 @@ public class JavaConnection extends Connection implements EventOwner { private SocketChannel socketChannel_; private Worker worker_; private final boolean synchroniseReads_; + private final ActivityWatcher watcher_; } |