Write blocked waiting on read
Hi all,
I have been experiencing difficulties with simultaneous read/write with AsyncIO. For my scenario that the client/server application is being developed, a client may send requests or status information at any given time. Hence continual monitoring on the incoming stream is required.
The server shall respond depending upon the status message requiring a write to the client (so it won't respond to all messages received). Likewise, the client in some instances will only send status messages depending on the last message it received from the server.
I've been experiencing difficulties with writes blocking because of reading. Essentially I would like to continually poll reading whilst allowing writes to be flushed immediately.
Using traditional blocking read/writes things hang indefinitely whilst attempting to write a message as reading has blocked waiting for input.
Using the IBM AsyncIO package (which is purported to be faster implementation of NIO), writing blocks for some time until reading (i assume) relinquishes the socket to allow writing to occur before resuming reading again. The lag time in this situation is significant.
Is someone able to provide an example using non-blocking R/W in which a server can sit reading (on one thread) whilst the writing is attempted on another thread that doesn't cause any lag?
Below is a basic overview of what is happening in my software:
publicclass MessageQueue{
private LinkedList<Message> queue;
/** Creates a new instance of MessageQueue */
public MessageQueue(){
queue =new LinkedList<Message>();
}
publicsynchronizedvoid put(Message message){
queue.add( message );
notifyAll();
}
publicsynchronizedboolean isEmpty(){
return queue.isEmpty();
}
publicsynchronized Message get(){
while( queue.isEmpty() ){
try{
wait();
}catch( InterruptedException ie ){
;
}
}
Message message = ( Message )queue.removeFirst();
return message;
}
publicsynchronizedvoid close(){
queue.clear();
queue =null;
}
}
publicclass InputReaderimplements Runnable{
private MessageQueue messages;
private AsyncSocketChannel async_channel;
...
public InputReader(MessageQueue messages, AsyncSocketChannel async_channel){
this.messages = messages;
this.async_channel = async_channel;
}
publiclong read(ByteBuffer b){
long bytes_read = 0;
helper =new AsyncSocketChannelHelper( this.async_channel );
future = channel.read(b);
bytes_read = future.getByteCount( );
return bytes_read;
}
publicvoid run(){
ByteBuffer b = ByteBuffer.allocateDirect(Message.SIZE);
boolean running =true;
while(running){
if (read(b) == 0)
running =false;
else
messages.put(new Message(b));
}
}
}
publicclass OutputWriterimplements Runnable{
private MessageQueue messages;
private AsyncSocketChannel async_channel;
...
public OutputWriter(MessageQueue messages, AsyncSocketChannel async_channel){
this.messages = messages;
this.async_channel = async_channel;
}
publiclong write(ByteBuffer b){
long bytes_written = 0;
try{
AsyncSocketChannelHelper helper =new AsyncSocketChannelHelper( this.async_channel );
IAsyncFuture future = helper.write(b, 20000);// write or timeout
// wait for completion of write, or for the timeout to happen
bytes_written = future.getByteCount( );
// THIS IS WHERE THE PROBLEM LIES. The write does not happen straight away because of the read operation. With traditional blocking IO this locks completely.
}catch ( AsyncTimeoutException ate){
System.err.println("Timed out after 20 seconds");
}
return bytes_written;
}
publicvoid run(){
boolean running =true;
while(running){
Message m = this.messages.get();
if (write(m.getByteBuffer()) == 0)
running =false;
else
messages.put(new Message(b));
}
}
}
publicclass Controller{
public Controller(AsyncSocketChannel async_channel){
MessageQueue in =new MessageQueue();
MessageQueue out =new MessageQueue();
InputReader ir =new InputReader(out, async_channel);
OutputWriter ow =new OutputWriter(out, async_channel);
new Thread(ow).start();
new Thread(ir).start();
boolean running =true;
Message m;
while(running){
m = in.get();
if (m.getStatus() =="REQUIRES_RESPONSE"))
out.put(m);// dummy example to demonstrate that once the right condition is met, a new message must be written
}
}
}

