Error SocketChannel receive multiple messages at once?
Hello,
I use Java NIO, non blocking for my client-server program.
Everything works ok, until there are many clients that sending messages at the same time to the server.
The server can identify all the clients, and begin reading, but the reading of those multiple clients are always the same message.
For example, client A send "Message A", client B send "Missing Message", client C send "Another Missing Message" at the same time to the server, the server read client A send "Message A", client B send "Message A", client C send "Message A", only happen if the server trying to read all those messages at once, if the server read one by one, it's working perfectly.
What's wrong with my code?
This is on the server, reading the message:
private SelectorpacketReader;// the selector to read client message
publicvoid update(long elapsedTime)throws IOException{
...
if (packetReader.selectNow() > 0){
// message received
Iterator packetIterator = packetReader.selectedKeys().iterator();
while (packetIterator.hasNext()){
SelectionKey key = (SelectionKey) packetIterator.next();
packetIterator.remove();
// there is client sending message, looping until all clients message
// fully read
// construct packet
TCPClient client = (TCPClient) key.attachment();
try{
client.read();// IN HERE ALL THE CLIENTS READ THE SAME MESSAGES
// if only one client send message, it's working, if there are multiple selector key, the message screwed
}catch (IOException ex){
//ex.printStackTrace();
removeClient(client);// something not right, kick this client
}
}
}
On the client, I think this is the culprit :
private ByteBufferreadBuffer;// the byte buffer
private SocketChannel client;// the client SocketChannel
protectedsynchronizedvoid read()throws IOException{
readBuffer.clear();// clear previous buffer
int bytesRead = client.read(readBuffer);// THIS ONE READ THE SAME MESSAGES, I think this is the culprit
if (bytesRead < 0){
thrownew IOException("Reached end of stream");
}elseif (bytesRead == 0){
return;
}
// write to storage (DataInputStream input field storage)
storage.write(readBuffer.array(), 0, bytesRead);
...
// in here the construction of the buffer to real message
}
How could the next client read not from the beginning of the received message but to its actual message (client.read(readBuffer)), i'm thinking to use SocketChannel.read(ByteBuffer[] dsts, , int offset, int length) but don't know the offset, the length, and what's that for :(
Anyone networking gurus, please help...
Thank you very much.
[4459 byte] By [
javaweirda] at [2007-11-27 8:40:45]

# 1
(1) You can't assume that each read delivers an entire message.
(2) You're sharing the same ByteBuffer between all your clients so you're running some considerable risks if (1) doesn't happen. I would need to see the code that does 'the construction of the buffer to real message' but I would guess that you don't take any notice of the buffer's limit, or you don't flip it, or you don't use bytesRead in there. So you're probably getting the data all mixed up - you'll probalby be seeing a new client message followed by whatever was there from the last time, or the time before.
I recommend you run a ByteBuffer per client and have a good look at its API and the NIO examples.
ejpa at 2007-7-12 20:39:20 >

# 2
Hello ejp, thanks for the reply.
> (1) You can't assume that each read delivers an entire message.
Yep I know about this, like I'm saying everything is okay when all the clients send the message not in the same time, but when the server tries to read client message at the same time, for example there are 3 clients message arrive at the same time and the server tries to read it, the server construct all the message exactly like the first message arrived.
This is the actual construction of the message, read the length of the packet first, then construct the message into DataInputStream, and read the message from it:
// packet stream reader
private DataInputStream input;
private NewPipedOutputStream storage;
private booleanwaitingForLength = true;
private int length;
protected synchronized void read() throws IOException {
readBuffer.clear(); // clear previous byte buffer
int bytesRead = client.read(readBuffer); // read how many bytes read
if (bytesRead < 0) {
throw new IOException("Reached end of stream");
} else if (bytesRead == 0) {
return;
}
// the bytes read is used to fill the byte buffer
storage.write(readBuffer.array(), 0, bytesRead);
// after read the packet
// this is the message construction
// write to byte buffer to input storage
// (this will write into DataInputStream)
storage.write(readBuffer.array(), 0, bytesRead);
// unpack the packet
while (input.available() > 0) {
// unpack the byte length first
if (waitingForLength) { // read the packet length first
if (input.available() > 2) {
length = input.readShort();
waitingForLength = false;
} else {
// the length has not fully read
break; // wait until the next read
}
// construct the packet if the length already known
} else {
if (input.available() >= length) {
// store the content to data
byte[] data = new byte[length];
input.readFully(data);
// add to received packet
addReceivedPacket(data);
waitingForLength = true; // wait for another packet
} else {
// the content has not fully read
break; // wait until next read
}
}
}
}
> (2) You're sharing the same ByteBuffer between all your clients
> so you're running some considerable risks if (1) doesn't happen.
> I recommend you run a ByteBuffer per client and have a good look
> at its API and the NIO examples.
Yep, I already use one ByteBuffer per client, it's not shared among the clients, this is the class for each client:
private SocketChannel client; // socket channel per client
private ByteBuffer readBuffer; // byte buffer per client
private Selector packetReader; // the selector that is shared among all clients
private static final int BUFFER_SIZE = 10240; // default huge buffer size for reading
private void init() throws IOException {
storage; // the packet storage writer
input; // the actual packet input
readBuffer = ByteBuffer.allocate(BUFFER_SIZE); // the byte buffer is one per client
readBuffer.order(ByteOrder.BIG_ENDIAN);
client.configureBlocking(false);
client.socket().setTcpNoDelay(true);
// register packet reader
// one packetReader used by all clients
client.register(packetReader, SelectionKey.OP_READ, this);
}
Cos the ByteBuffer is not shared, I think it's impossible that it mixed up with other clients, what I think is the SocketChannel client.read(ByteBuffer) that fill the byte buffer with the same packet?
> So you're probably getting the data all mixed up - you'll probalby be seeing a new client message followed by whatever was there from the last time, or the time before.
If the server not trying to read all the messages at the same time, it works fine, I think that if the client SocketChannel filled up with multiple client messages, the SocketChannel.read(...) only read the first client message.
Or am I doing something wrong, I could fasten the server read by removing the Thread.sleep(100); but I think that's not the good solution, since in NIO there should be time where the server need to read client multiple messages at once.
Any other thoughts?
Thanks again.
# 3
You definitely don't need the sleep(100). That's just literally a waste of time.
If you have a buffer per client there is no need to clear the buffer, and if you're trying to cope with partial reads etc this is counter-productive.
I don't understand why you're using a PipedOutputStream, presumably hooked up to a PipedInputStream somewhere, presumably hooked up to 'DataInputStream input'. Using Pipes implies another thread, per client, and the whole point of NIO is to avoid threads per client. Are you sure the duplication isn't happening in there? If you're not using an extra thread for the PipedInputStream you are liable to deadlock.
You need to reconsider all of this. Read the data into the same buffer, without clearing it, until you've got it all. When you've got enough data to represent the length word, read that word via ByteBuffer.asIntBuffer().readShort(), and advance the ByteBuffer's 'position' yourself. When you've read the whole message, compact the buffer. Get rid of all this Piped nonsense, it is a flying buttress on a B47.
ejpa at 2007-7-12 20:39:20 >

# 4
the client send the message, or you log the message when a buffer is available ?
A commom error is to log messages on Channel.canWrite(), register the channels with WRITE cause 100% CPU, cause they always can be writted.
ByteBuffers can be cleaned, You don't need to use one for each client or new received buffer.
cya
# 5
> the client send the message, or you log the message
> when a buffer is available ?
This question doesn't make any sense.
> A commom error is to log messages on
> Channel.canWrite()
There is no such method.
> ByteBuffers can be cleaned, You don't need to use one
> for each client or new received buffer.
Not if you have to deal with partial messages, which you do. As already said above.
I recommend you read the following:
http://forum.java.sun.com/thread.jspa?threadID=459338
http://www.telekinesis.com.au/wipv3_6/FundamentalNetworkingInJava.A21
and please write more clearly.
ejpa at 2007-7-12 20:39:21 >

# 6
I'm sorry, I probaly misunderstood the topic.And I'll try to get a better english.bye.
# 7
Sorry for the late reply,
I need to continue developing the application, this networking stuff has been hold me a few days, so I made a quick hack, shorten the server sleep to Thread.sleep(5), and everything goes fine, for now, get back to continue the application development....
The sleep(5) fix it because the server won't read multiple client messages at a time for quick loop.
I give some dukes for ejp, thanks for mention sleep(100) is not needed! I'm just trying to be nice to the server loop :-)
But this not the real solution, I need to truly fix the client packet reading.
> I don't understand why you're using a PipedOutputStream,
> presumably hooked up to a PipedInputStream somewhere,
> presumably hooked up to 'DataInputStream input'.
Yep, the NewPipedOutputStream is hooked to NewPipedInputStream, and the last yes it goes into DataInputStream, my program construct the packet from this DataInputStream, input.readInt(), readUTF(), readShort(), ...
The ByteBuffer writes it's data to NewPipedInputStream, and then the NewPipedInputStream read fully the data into byte array (byte[]).
This byte array goes into DataInputStream:
byte[] data = new byte[length];
input.readFully(data); // the NewPipedInputStream read the data into byte array
// the byte array is constructed to DataInputStream
ByteArrayInputStream bain= new ByteArrayInputStream(data);
DataInputStream input= new DataInputStream(bain);
// construct the packet from the DataInputStream
input.readXXX();
Well everything goes fine in here, I'll later improve the performance, I juse need to fix those SocketChannel.read(ByteBuffer) that return invalid data when the SocketChannel read the data multiple times, when Selector.selectNow() return multiple clients.
What's wrong with this?
When only reading one client packet, it works charm, but screwed up for sure when reading multiple packets at once:
The problem is still the same :
private Selector packetReader;
if (packetReader.selectNow() > 0) {
// packet received
Iterator packetIterator = packetReader.selectedKeys().iterator();
while (packetIterator.hasNext()) {
SelectionKey key = (SelectionKey) packetIterator.next();
packetIterator.remove();
TCPClient client = (TCPClient) key.attachment();
client.read();
// if there is another client, the packet is same with the first
// or the last, I'm not sure
}
}
The packet reading :
private SocketChannel client;
private ByteBuffer readBuffer;
// registering packetReader Selector
private Selector packetReader; // shared on all clients, the server hold this
client.register(packetReader, SelectionKey.OP_READ, this);
// reading packet
readBuffer.clear();
int bytesRead = client.read(readBuffer); // this return the same data?
I'm still don't know how to fix this, can't find what to be changed :-(
I think I will use one Selector per client, as right now, the Selector is shared among all clients.
But if one Selector per client, if there are many clients, hundreds clients, I must iterate all the clients to see if there is packet for that client using Selector.selectNow(), and if the same clients sending multiple messages at once, back to the same problem, guess this is not the right way for sure :-(
> Using Pipes implies another thread, per client,
> and the whole point of NIO is to avoid threads per client.
No, I only using one thread, the server read two Selector, one for reading new clients, and the other is the buggy one, reading clients messages, if there is messages, the reading message is delegated to the client SocketChannel.read.
Any other solutions? For reading without NewPipedInputStream/NewPipedOutputStream, I don't know how to change those ByteBuffer into byte array and construct it into DataInputStream/DataOutputStream, I use DataInputStream/DataOutputStream to read/send packet.
Any help is greatly appreaciated.
# 8
OMG, it's all my fault....!!
I debug my framework once again comprehensively and....
Sorry, the problem is my networking framework!
I use static field for received packet on the clients....
That's why when incoming multiple packets, the packet for all clients have the same content.
Sorry for this misunderstanding post, reward all the dukes to ejp that patiently reply to my silliness.
Thanks to all, I must recheck my networking framework once again before posting.
Anyway, other thing pop in my mind now, what do you mean by using the ByteBuffer directly ejp? How could I read the data inside it into Java types?
Right now I'm using DataInputStream wrapped on NewPipedInputStream/NewPipedOutputStream from byte array that constructed by ByteArrayInputStream, that's quite a long journey of network packet :-)
Sorry again for the basic misunderstanding, static field sometime could breaks OOP-ness awfully if not carefully used :-)