diff options
-rw-r--r-- | src/com/isode/stroke/network/JavaConnection.java | 237 |
1 files changed, 175 insertions, 62 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java index 7814f71..efbf54f 100644 --- a/src/com/isode/stroke/network/JavaConnection.java +++ b/src/com/isode/stroke/network/JavaConnection.java @@ -3,7 +3,7 @@ * All rights reserved. */ /* - * Copyright (c) 2010-2012, Isode Limited, London, England. + * Copyright (c) 2010-2013, Isode Limited, London, England. * All rights reserved. */ package com.isode.stroke.network; @@ -12,6 +12,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; @@ -32,6 +34,10 @@ public class JavaConnection extends Connection implements EventOwner { public Worker(HostAddressPort address) { address_ = address; } + + private boolean isWriteNeeded() { + return (!writeBuffer_.isEmpty()); + } public void run() { try { @@ -42,79 +48,87 @@ public class JavaConnection extends Connection implements EventOwner { * isn't what we want */ socketChannel_.configureBlocking(false); + selector_ = Selector.open(); + selectionKey_ = socketChannel_.register(selector_, SelectionKey.OP_READ); } catch (IOException ex) { handleConnected(true); return; } handleConnected(false); while (!disconnecting_) { - while (!writeBuffer_.isEmpty()) { - ByteArray data = writeBuffer_.get(0); - ByteBuffer byteBuffer = ByteBuffer.wrap(data.getData()); - try { - /* Because the SocketChannel is non-blocking, we have to - * be prepared to cope with the write operation not - * consuming all of the data - */ - boolean finishedWriting = false; - while (!finishedWriting && !disconnecting_) { - socketChannel_.write(byteBuffer); - finishedWriting = (byteBuffer.remaining() == 0); - if (!finishedWriting) { - try { - /* Give the output buffer a chance to empty */ - Thread.sleep(100); - } - catch (InterruptedException e) { - /* Perhaps someone has set disconnecting_ */ - } - } - } - } catch (IOException ex) { - disconnecting_ = true; - handleDisconnected(Error.WriteError); - } - writeBuffer_.remove(0); - } - ByteArray data = new ByteArray(); + /* This will block until something is ready on the selector, + * including someone calling selector.wakeup(), or until the + * thread is interrupted + */ try { - ByteBuffer byteBuffer = ByteBuffer.allocate(1024); - - int count = socketChannel_.read(byteBuffer); - while (count > 0) { - byteBuffer.flip(); - byte[] result = new byte[byteBuffer.remaining()]; - byteBuffer.get(result); - byteBuffer.compact(); - for (int i=0; i<result.length; i++) { - data.append(result[i]); + selector_.select(); + } catch (IOException e) { + disconnected_ = true; + handleDisconnected(null); + break; + } + + /* Something(s) happened. See what needs doing */ + if (disconnecting_) { + handleDisconnected(null); + /* No point doing anything else */ + break; + } + boolean writeNeeded = isWriteNeeded(); + boolean readNeeded = selectionKey_.isReadable(); + + { /* Handle any writing */ + if (writeNeeded) { + try { + doWrite(); + } + catch (IOException e) { + disconnecting_ = true; + handleDisconnected(Error.WriteError); } - count = socketChannel_.read(byteBuffer); - } - if (count == -1) { - /* socketChannel input has reached "end-of-stream", which - * we regard as meaning that the socket has been closed - */ - throw new IOException("socketChannel_.read returned -1"); } - } catch (IOException ex) { - handleDisconnected(Error.ReadError); - return; } - if (!data.isEmpty()) { - handleDataRead(data); - } + { /* Handle any reading */ + ByteArray dataRead; - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - /* We've been woken up, probably to force us to do something.*/ + if (readNeeded) { + try { + dataRead = doRead(); + if (!dataRead.isEmpty()) { + handleDataRead(dataRead); + } + } catch (IOException ex) { + handleDisconnected(Error.ReadError); + return; + } + } + } + + if (isWriteNeeded() && !disconnected_) { + /* There is something that's not been written yet. + * This might happen because the "doWrite()" didn't + * write the complete buffer, or because our "write()" + * method was called (perhaps more than once) since + * this thread was woken. + * + * Give the buffer a chance to empty + */ + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + /* */ + } + /* Force the next iteration of the loop to wake up + * straight away, and check all conditions again + */ + selector_.wakeup(); } } handleDisconnected(null); - }finally { + } finally { if(socketChannel_ != null) { try { socketChannel_.close(); @@ -125,10 +139,102 @@ public class JavaConnection extends Connection implements EventOwner { } } + /** + * Called when there's something in the writeBuffer to be written. + * Will remove from writeBuffer_ anything that got written. + * @throws IOException if an error occurs when trying to write to the + * socket + */ + private void doWrite() throws IOException { + if (!isWriteNeeded()) { + return; + } + + ByteArray data = writeBuffer_.get(0); + byte[] bytes = data.getData(); + int bytesToWrite = bytes.length; + + if (bytesToWrite == 0) { + /* + * Not sure if this can happen, but does no harm to check + */ + writeBuffer_.remove(0); + return; + } + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + + /* + * Because the SocketChannel is non-blocking, we have to + * be prepared to cope with the write operation not + * consuming all (or any!) of the data + */ + boolean finishedWriting = false; + int bytesWritten = socketChannel_.write(byteBuffer); + finishedWriting = (byteBuffer.remaining() == 0); + if (finishedWriting) { + writeBuffer_.remove(0); + return; + } + /* Was anything written at all? */ + if (bytesWritten == 0) { + /* Leave the buffer in the array so that it'll get tried + * again later + */ + return; + } + + /* The buffer was *partly* written. This means we have to + * remove that part. We do this by creating a new ByteArray + * with the remaining bytes in, and replacing the first + * element in the list with that. + */ + byte[] remainingBytes = new byte[bytesToWrite - bytesWritten]; + System.arraycopy(bytes, bytesWritten,remainingBytes,0, remainingBytes.length); + ByteArray leftOver = new ByteArray(remainingBytes); + + writeBuffer_.set(0, leftOver); + return; + } + + /** + * Called when there's something that's come in on the socket + * @return a ByteBuffer containing bytes read (may be empty, won't be null) + * @throws IOException if the socket got closed + */ + private ByteArray doRead() throws IOException { + + ByteBuffer byteBuffer = ByteBuffer.allocate(1024); + ByteArray data = new ByteArray(); + + int count = socketChannel_.read(byteBuffer); + if (count == 0) { + return data; + } + while (count > 0) { + byteBuffer.flip(); + byte[] result = new byte[byteBuffer.remaining()]; + byteBuffer.get(result); + byteBuffer.compact(); + for (int i=0; i<result.length; i++) { + data.append(result[i]); + } + + count = socketChannel_.read(byteBuffer); + } + if (count == -1) { + /* socketChannel input has reached "end-of-stream", which + * we regard as meaning that the socket has been closed + */ + throw new IOException("socketChannel_.read returned -1"); + } + return data; + } + private void handleConnected(final boolean error) { + eventLoop_.postEvent(new Callback() { public void run() { - onConnectFinished.emit(error); + onConnectFinished.emit(Boolean.valueOf(error)); } }); } @@ -152,9 +258,6 @@ public class JavaConnection extends Connection implements EventOwner { }); } - public void write(ByteArray data) { - writeBuffer_.add(data); - } } private JavaConnection(EventLoop eventLoop) { @@ -176,17 +279,25 @@ public class JavaConnection extends Connection implements EventOwner { worker_ = new Worker(address); Thread workerThread = new Thread(worker_); workerThread.setDaemon(true); + workerThread.setName("JavaConnection "+ address.toString()); workerThread.start(); } @Override public void disconnect() { disconnecting_ = true; + if (selector_ != null) { + selector_.wakeup(); + } } @Override public void write(ByteArray data) { worker_.writeBuffer_.add(data); + if (selector_ != null) { + selector_.wakeup(); + } + } @Override @@ -213,6 +324,8 @@ public class JavaConnection extends Connection implements EventOwner { private boolean disconnecting_ = false; private boolean disconnected_ = false; private SocketChannel socketChannel_; + private Selector selector_; + private SelectionKey selectionKey_; private Worker worker_; } |