summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/isode/stroke/network/JavaConnection.java')
-rw-r--r--src/com/isode/stroke/network/JavaConnection.java237
1 files changed, 175 insertions, 62 deletions
diff --git a/src/com/isode/stroke/network/JavaConnection.java b/src/com/isode/stroke/network/JavaConnection.java
index 7814f71..efbf54f 100644
--- a/src/com/isode/stroke/network/JavaConnection.java
+++ b/src/com/isode/stroke/network/JavaConnection.java
@@ -3,7 +3,7 @@
* All rights reserved.
*/
/*
- * Copyright (c) 2010-2012, Isode Limited, London, England.
+ * Copyright (c) 2010-2013, Isode Limited, London, England.
* All rights reserved.
*/
package com.isode.stroke.network;
@@ -12,6 +12,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,6 +34,10 @@ public class JavaConnection extends Connection implements EventOwner {
public Worker(HostAddressPort address) {
address_ = address;
}
+
+ private boolean isWriteNeeded() {
+ return (!writeBuffer_.isEmpty());
+ }
public void run() {
try {
@@ -42,79 +48,87 @@ public class JavaConnection extends Connection implements EventOwner {
* isn't what we want
*/
socketChannel_.configureBlocking(false);
+ selector_ = Selector.open();
+ selectionKey_ = socketChannel_.register(selector_, SelectionKey.OP_READ);
} catch (IOException ex) {
handleConnected(true);
return;
}
handleConnected(false);
while (!disconnecting_) {
- while (!writeBuffer_.isEmpty()) {
- ByteArray data = writeBuffer_.get(0);
- ByteBuffer byteBuffer = ByteBuffer.wrap(data.getData());
- try {
- /* Because the SocketChannel is non-blocking, we have to
- * be prepared to cope with the write operation not
- * consuming all of the data
- */
- boolean finishedWriting = false;
- while (!finishedWriting && !disconnecting_) {
- socketChannel_.write(byteBuffer);
- finishedWriting = (byteBuffer.remaining() == 0);
- if (!finishedWriting) {
- try {
- /* Give the output buffer a chance to empty */
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- /* Perhaps someone has set disconnecting_ */
- }
- }
- }
- } catch (IOException ex) {
- disconnecting_ = true;
- handleDisconnected(Error.WriteError);
- }
- writeBuffer_.remove(0);
- }
- ByteArray data = new ByteArray();
+ /* This will block until something is ready on the selector,
+ * including someone calling selector.wakeup(), or until the
+ * thread is interrupted
+ */
try {
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-
- int count = socketChannel_.read(byteBuffer);
- while (count > 0) {
- byteBuffer.flip();
- byte[] result = new byte[byteBuffer.remaining()];
- byteBuffer.get(result);
- byteBuffer.compact();
- for (int i=0; i<result.length; i++) {
- data.append(result[i]);
+ selector_.select();
+ } catch (IOException e) {
+ disconnected_ = true;
+ handleDisconnected(null);
+ break;
+ }
+
+ /* Something(s) happened. See what needs doing */
+ if (disconnecting_) {
+ handleDisconnected(null);
+ /* No point doing anything else */
+ break;
+ }
+ boolean writeNeeded = isWriteNeeded();
+ boolean readNeeded = selectionKey_.isReadable();
+
+ { /* Handle any writing */
+ if (writeNeeded) {
+ try {
+ doWrite();
+ }
+ catch (IOException e) {
+ disconnecting_ = true;
+ handleDisconnected(Error.WriteError);
}
- count = socketChannel_.read(byteBuffer);
- }
- if (count == -1) {
- /* socketChannel input has reached "end-of-stream", which
- * we regard as meaning that the socket has been closed
- */
- throw new IOException("socketChannel_.read returned -1");
}
- } catch (IOException ex) {
- handleDisconnected(Error.ReadError);
- return;
}
- if (!data.isEmpty()) {
- handleDataRead(data);
- }
+ { /* Handle any reading */
+ ByteArray dataRead;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- /* We've been woken up, probably to force us to do something.*/
+ if (readNeeded) {
+ try {
+ dataRead = doRead();
+ if (!dataRead.isEmpty()) {
+ handleDataRead(dataRead);
+ }
+ } catch (IOException ex) {
+ handleDisconnected(Error.ReadError);
+ return;
+ }
+ }
+ }
+
+ if (isWriteNeeded() && !disconnected_) {
+ /* There is something that's not been written yet.
+ * This might happen because the "doWrite()" didn't
+ * write the complete buffer, or because our "write()"
+ * method was called (perhaps more than once) since
+ * this thread was woken.
+ *
+ * Give the buffer a chance to empty
+ */
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ /* */
+ }
+ /* Force the next iteration of the loop to wake up
+ * straight away, and check all conditions again
+ */
+ selector_.wakeup();
}
}
handleDisconnected(null);
- }finally {
+ } finally {
if(socketChannel_ != null) {
try {
socketChannel_.close();
@@ -125,10 +139,102 @@ public class JavaConnection extends Connection implements EventOwner {
}
}
+ /**
+ * Called when there's something in the writeBuffer to be written.
+ * Will remove from writeBuffer_ anything that got written.
+ * @throws IOException if an error occurs when trying to write to the
+ * socket
+ */
+ private void doWrite() throws IOException {
+ if (!isWriteNeeded()) {
+ return;
+ }
+
+ ByteArray data = writeBuffer_.get(0);
+ byte[] bytes = data.getData();
+ int bytesToWrite = bytes.length;
+
+ if (bytesToWrite == 0) {
+ /*
+ * Not sure if this can happen, but does no harm to check
+ */
+ writeBuffer_.remove(0);
+ return;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+ /*
+ * Because the SocketChannel is non-blocking, we have to
+ * be prepared to cope with the write operation not
+ * consuming all (or any!) of the data
+ */
+ boolean finishedWriting = false;
+ int bytesWritten = socketChannel_.write(byteBuffer);
+ finishedWriting = (byteBuffer.remaining() == 0);
+ if (finishedWriting) {
+ writeBuffer_.remove(0);
+ return;
+ }
+ /* Was anything written at all? */
+ if (bytesWritten == 0) {
+ /* Leave the buffer in the array so that it'll get tried
+ * again later
+ */
+ return;
+ }
+
+ /* The buffer was *partly* written. This means we have to
+ * remove that part. We do this by creating a new ByteArray
+ * with the remaining bytes in, and replacing the first
+ * element in the list with that.
+ */
+ byte[] remainingBytes = new byte[bytesToWrite - bytesWritten];
+ System.arraycopy(bytes, bytesWritten,remainingBytes,0, remainingBytes.length);
+ ByteArray leftOver = new ByteArray(remainingBytes);
+
+ writeBuffer_.set(0, leftOver);
+ return;
+ }
+
+ /**
+ * Called when there's something that's come in on the socket
+ * @return a ByteBuffer containing bytes read (may be empty, won't be null)
+ * @throws IOException if the socket got closed
+ */
+ private ByteArray doRead() throws IOException {
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ ByteArray data = new ByteArray();
+
+ int count = socketChannel_.read(byteBuffer);
+ if (count == 0) {
+ return data;
+ }
+ while (count > 0) {
+ byteBuffer.flip();
+ byte[] result = new byte[byteBuffer.remaining()];
+ byteBuffer.get(result);
+ byteBuffer.compact();
+ for (int i=0; i<result.length; i++) {
+ data.append(result[i]);
+ }
+
+ count = socketChannel_.read(byteBuffer);
+ }
+ if (count == -1) {
+ /* socketChannel input has reached "end-of-stream", which
+ * we regard as meaning that the socket has been closed
+ */
+ throw new IOException("socketChannel_.read returned -1");
+ }
+ return data;
+ }
+
private void handleConnected(final boolean error) {
+
eventLoop_.postEvent(new Callback() {
public void run() {
- onConnectFinished.emit(error);
+ onConnectFinished.emit(Boolean.valueOf(error));
}
});
}
@@ -152,9 +258,6 @@ public class JavaConnection extends Connection implements EventOwner {
});
}
- public void write(ByteArray data) {
- writeBuffer_.add(data);
- }
}
private JavaConnection(EventLoop eventLoop) {
@@ -176,17 +279,25 @@ public class JavaConnection extends Connection implements EventOwner {
worker_ = new Worker(address);
Thread workerThread = new Thread(worker_);
workerThread.setDaemon(true);
+ workerThread.setName("JavaConnection "+ address.toString());
workerThread.start();
}
@Override
public void disconnect() {
disconnecting_ = true;
+ if (selector_ != null) {
+ selector_.wakeup();
+ }
}
@Override
public void write(ByteArray data) {
worker_.writeBuffer_.add(data);
+ if (selector_ != null) {
+ selector_.wakeup();
+ }
+
}
@Override
@@ -213,6 +324,8 @@ public class JavaConnection extends Connection implements EventOwner {
private boolean disconnecting_ = false;
private boolean disconnected_ = false;
private SocketChannel socketChannel_;
+ private Selector selector_;
+ private SelectionKey selectionKey_;
private Worker worker_;
}