Selector reads alternate "packets"...
I have a Selector that does network reads to a queue and network writes from a queue, as well as 2 threads (a Producer and a Consumer) that add/remove to those queues. The elements of the queues are Packets which are simple (Length, Type, Data)-tuplet objects.
Everything seems to be working except for the Selector reads. When I test, I see that the network writes all go through. However, the network reads which should correspond one-to-one to the writes instead read (roughly) every other Packet.
Some clues:
1) The dropped packets are always wholly (not partially) dropped.
2) The Packet consists of array of 3 ByteBuffers (Length, Type, Data). When the Selector triggers an OP_READ, I bulk read into this array. (Done in 2 successive reads. Once for the length, once for the rest.) If it's not filled, the Packet remains as a "partial packet" and will continue to be filled on the next OP_READ. If it is filled, it is moved to the queue for the Consumer. In either case, I only read enough off the channel for the one packet. I then fall through, assuming the Selector will trigger immediately again if I left readable data in the channel, and then I will do the same all over again. Is this an incorrect assumption?
Any ideas?
Dan
[1288 byte] By [
dkopko123] at [2007-9-30 3:42:50]

I've since re-implemented it, so #2 is no longer correct. I now read as many packets as I can on one trigger of Select on OP_READ.
Still the same problem, though.
Good News: Even with very conservative synchronization, this runs like lightning.
Bad News: Out of 100 packets, 10 went missing.
--Dan
If this is UDP, it is unreliable and there is nothing you can do about it except build retries into your application protocol.If this is TCP, it is reliable, so you have bugs in your application code.EJP
ejp at 2007-6-29 14:48:53 >

Thanks, it should be TCP, and I'm currently trying hard on the bug front.
If I log into my node via telnet, it sees all the writes. However the reader half of my node doesn't see them all. Sounds like a synch issue, but I know how to do synchronization and I've pored over the code.
Do I have to synch on any underlying sockets or anything in order to use the bulk reads/writes off of a SocketChannel?
--Dan
I figured out the solution to my problem if anyone is interested. However, I'm still not quite sure of the reasoning.
Preliminaries: I have a packetStructure which is a ByteBuffer[] {lengthField, typeField, dataField}.
So, when I do bulk reads, I want to make sure I get all the lengthField first. Then I want to set the limit of the dataField before I allow the read to continue. So it is a two step process.
My original code (this locks for some reason):
if(lengthField.remaining() != 0) {
amtRead1 = channel.read(packetStructure,
lengthField.position(),
lengthField.remaining() - 1);
...
}
My new code (this works fine):
if(lengthField.remaining() != 0) {
amtRead1 = channel.read(lengthField);
...
}
So anyways, the original one works fine except in 2 cases:
1) On a loopback connection to the same computer
2) Sporadic failures on a fast stream of packets. (about 1-10% drop rate)
The new one works fine always.
Note also that I do not synchronize anywhere in my code on either packetStructure or lengthField.
Anyways, I'm happy now, my code is working. If anyone's up for a puzzle, feel free to tackle why there's a difference in the two above...
Thanks,
Dan
I think the problem is the offset argument. According to API:
"offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length"
Thus in order to read into the first ByteBuffer it must be 0.
You need to keep reading until you have read all the expected data, using the result of read() to tell you how much was actually read.
ejp at 2007-6-29 14:48:53 >

> I think the problem is the offset argument. According
> to API:
>
> "offset - The offset within the buffer array of
> the first buffer into which bytes are to be
> transferred; must be non-negative and no larger than
> dsts.length"
>
> Thus in order to read into the first ByteBuffer it
> must be 0.
The first iteration through this, position() == 0. Subsequent iterations of the code deal with updated position until position = length-1.
I don't think that's the problem.
--Dan
> You need to keep reading until you have read all the> expected data, using the result of read() to tell you> how much was actually read.That read repeats, sorry I wasn't clear. It repeats until position == length-1.