Yes, I use the same thread.
After regitered to selector (step 2); I got the key will spend 3sec and loop exeute serval times.
--
code like(not include register,but did it):
while (true) {
/* have a list to store the request ,this section get the request then register */
{
1.accpet request like client socket connect,read,write
2.register to selector ops
}
/* select form selector ,deal with it */
{
selector.select(timeout);// timeout=300ms
3. do with selectKey
}
}
Message was edited by:
lanbangbang
Message was edited by:
lanbangbang
i use seda to try it , if you didnot use it ,see the code:
public void run() {
while (true) {
begin = System.currentTimeMillis();
try {
aggTarget = tp.getAggregationTarget();
while (selsource != null && selsource.numActive() == 0) {
1.do with handler,this block 300s to read List(spend less than 400ms)
}
for (int s = 0; s < SELECT_SPIN; s++) {
SelectQueueElement ret[];
if (aggTarget == -1) {
ret = (SelectQueueElement[])selsource.blocking_dequeue_all(SELECT_TIMEOUT);
} else {
ret = (SelectQueueElement[])selsource.blocking_dequeue(SELECT_TIMEOUT, aggTarget);
}
if (ret != null) {
// deal with all ret
}
}
for (int s = 0; s < 3; s++) {
1. do request(no block,spend less than 100ms)
}
}
Thread.currentThread().yield();
} catch (Exception e) {
System.err.println(name+": got exception "+e);
e.printStackTrace();
}
}
}
--
public Object register(Object nio_sc_obj, int ops) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): register " +nio_sc_obj +
" : " + this);
if (! (nio_sc_obj instanceof SelectableChannel)) {
System.err.println(
"register() called with non SelectableChannel argument. " +
"Should not happen!!"
);
return null;
}
SelectableChannel nio_sc = (SelectableChannel)nio_sc_obj;
synchronized (blocker) {
SelectionKey ret;
try {
ret = nio_sc.register(selector, ops);
} catch (ClosedChannelException cce) {
System.err.println("Closed Channel Exception: " + cce);
ret = null;
}
if (DEBUG) System.err.println("returning " + ret);
if (DEBUG) System.err.println("numactive = " + numActive());
blocker.notify();
return ret;
}
}
public void deregister(Object selkey_obj) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): deregister "+selkey_obj);
if (! (selkey_obj instanceof SelectionKey)) {
System.err.println(
"deregister() called on NIOSelectSource with non SelectionKey " +
"argument. Should not happen!!"
);
return;
}
synchronized (blocker) {
SelectionKey selkey = (SelectionKey)selkey_obj;
selkey.cancel();
/* This must be done so that calls to close() actually close. */
//try {
//selector.selectNow();
//} catch (IOException ioe) {
//Ignore
// }
blocker.notify();
}
}
public int numActive() {
// does this mean number with a non-zero request mask, or number
// in selectedKeys()
int n_active = 0;
synchronized (blocker) {
Iterator key_iter = selector.keys().iterator();
SelectionKey sk;
while (key_iter.hasNext()) {
sk = (SelectionKey)key_iter.next();
if (sk.isValid() && sk.interestOps() != 0) n_active++;
}
}
return n_active;
}
public int size() {
synchronized (this) {
return (ready_size - ready_offset);
}
}
public synchronized QueueElementIF blocking_dequeue(int timeout_millis) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue called");
synchronized (blocker) {
if (selector.keys().size() == 0) {
if (DEBUG) System.err.println("No keys in selector");
if (timeout_millis == 0) return null;
// Wait for something to be registered
if (timeout_millis == -1) {
try {
blocker.wait();
} catch (InterruptedException ie) {
}
} else {
try {
blocker.wait(timeout_millis);
} catch (InterruptedException ie) {
}
}
}
}
if ((ready_size == 0) || (ready_offset == ready_size)) {
doPoll(timeout_millis);
}
if (ready_size == 0) {
if (DEBUG) System.err.println("still no ready");
return null;
}
return new NIOSelectorQueueElement(ready[ready_offset++]);
}
public synchronized QueueElementIF[] blocking_dequeue_all(int timeout_millis) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue_all called");
/
synchronized (blocker) {
if (selector.keys().size() == 0) {
if (DEBUG) System.err.println("!!!!no keys");
if (timeout_millis == 0) return null;
// Wait for something to be registered
if (timeout_millis == -1) {
try {
blocker.wait();
} catch (InterruptedException ie) {
}
} else {
try {
blocker.wait(timeout_millis);
} catch (InterruptedException ie) {
}
}
}
}
if ((ready_size == 0) || (ready_offset == ready_size)) {
doPoll(timeout_millis);
}
if (ready_size == 0) return null;
NIOSelectorQueueElement ret[] =
new NIOSelectorQueueElement[ready_size-ready_offset];
for (int i = 0; i < ret.length; i++) {
if (DEBUG) System.err.println("ret["+i+"] = " + ready[ready_offset]);
ret = new NIOSelectorQueueElement(ready[ready_offset++]);
}
return ret;
}
public synchronized QueueElementIF[] blocking_dequeue(int timeout_millis, int num) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue called");
synchronized (blocker) {
if (selector.keys().size() == 0) {
if (timeout_millis == 0) return null;
// Wait for something to be registered
if (timeout_millis == -1) {
try {
blocker.wait();
} catch (InterruptedException ie) {
}
} else {
try {
blocker.wait(timeout_millis);
} catch (InterruptedException ie) {
}
}
}
}
if ((ready_size == 0) || (ready_offset == ready_size)) {
doPoll(timeout_millis);
}
if (ready_size == 0) return null;
int numtoret = Math.min(ready_size - ready_offset, num);
NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret];
for (int i = 0; i < numtoret; i++) {
ret = new NIOSelectorQueueElement(ready[ready_offset++]);
}
return ret;
}
private void doPoll(int timeout) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): Doing poll, timeout "+timeout);
SelectionKey ret[] = null;
synchronized (blocker) {
int c = 0;
try {
// to correct for changed semantics in nio from nbio.
// use selectNow to not block, and select(0) for indefinite block
if (timeout == 0) {
c = selector.selectNow();
} else {
if (timeout == -1) timeout = 0;
c = selector.select(timeout);
}
} catch (IOException e) {
}
Set skeys = selector.selectedKeys();
if (skeys.size() > 0) {
ret = new SelectionKey[skeys.size()];
Iterator key_iter = skeys.iterator();
int j = 0;
while (key_iter.hasNext()) {
ret[j] = (SelectionKey)key_iter.next();
key_iter.remove();
selector.selectedKeys().remove(ret[j]);
j++;
}
if (ret.length != 0) {
// XXX We can't get ret == null if doPoll() is synchronized with
// deregister() - but I'm not sure I want to do that
ready_offset = 0; ready_size = ret.length;
balance(ret);
return;
}
}
}
}
private void balance(SelectionKey selarr[]) {
if (DEBUG) System.err.println("NIOSelectSource ("+name+"): balance called, selarr size="+selarr.length);
for (int i = 0; i < selarr.length; i++)
if (DEBUG) System.err.println("!!!!selar["+i+"] = " + selarr);
if ((!do_balance) || (selarr.length < 2)) {
ready = selarr;
} else {
SelectionKey a;
ready = new SelectionKey[selarr.length];
for (int i = 0; i < ready.length; i++) {
if (balancer_seq_off == BALANCER_SEQUENCE_SIZE) {
balancer_seq_off = 0;
}
int n = balancer_seq[balancer_seq_off++] % selarr.length;
int c = 0;
while (selarr[n] == null) {
n++; c++;
if (n == selarr.length) n = 0;
if (c == selarr.length) {
System.err.println("WARNING: NIOSelectSource.balance(): All items in selarr are null (n="+n+", c="+c+", len="+selarr.length);
for (int k = 0; k < ready.length; k++) {
System.err.println("["+k+"] ready:"+ready[k]+" selarr:"+selarr[k]);
}
throw new IllegalArgumentException("balance: All items in selarr are null! This is a bug - please contact mdw@cs.berkeley.edu");
}
}
if (DEBUG) System.err.println("NIOSelectSource: balance: "+n+"->"+i);
a = selarr[n]; selarr[n] = null; ready = a;
}
}
}
private void initBalancer() {
balancer_seq = new int[BALANCER_SEQUENCE_SIZE];
Random r = new Random(); // XXX Need better seed?
for (int i = 0; i < BALANCER_SEQUENCE_SIZE; i++) {
balancer_seq = Math.abs(r.nextInt());
}
balancer_seq_off = 0;
}
hi
iam sathish working as a Software Engineer,and right now iam developing Streaming Gateway Protocol, by using java.nio package
right now iam in initial stage , so i thought u can help me regarding this .
and my problem is iam developing a UDPServer to read the data from client and sending back to client ...
please look at this code once
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
public class UDPServer3
{
String result = null;
static final int packetSize = 1024;
DatagramPacket packet;
DatagramSocket socket;
byte[] data;
int clientPort;
InetAddress address;
String str;
private Object m_byteBuffer;
public UDPServer3(int port) throws IOException
{
DatagramChannel channel = DatagramChannel.open();
Selector selector = Selector.open();
channel.socket().bind (new InetSocketAddress("192.168.30.22", 8081));
channel.configureBlocking (false);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE );
System.out.println("Server Channel is accepted");
}
public void run ()
throws Exception
{
while (true)
{
System.out.println("in while loop");
Selector selector = Selector.open();
System.out.println("Selector inside while loop is "+ selector);
System.out.println("before selector");
System.out.println("Selected " + selector.select());// from here controle is not moving
System.out.println("after selector");
int num=selector.select();
System.out.println("number is :"+num);
if(num==0)
{
continue;
}
Set keys = selector.selectedKeys();
// Iterator it=selector.selectedKeys().iterator();
Iterator it=keys.iterator();
System.out.println("iterator it is "+it);
while (it.hasNext())
{
System.out.println("in inner while loop");
SelectionKey key = (SelectionKey) it.next();
System.out.println("key is "+key);
if (key.isAcceptable()) {
DatagramChannel server =(DatagramChannel) key.channel();
System.out.println("channel keys");
DatagramChannel channel = server.connect(null);
registerChannel (selector, channel,
SelectionKey.OP_ACCEPT | SelectionKey.OP_WRITE);
System.out.println("Server Channel is reading");
}
if (key.isReadable())
{
m_byteBuffer = readDataFromSocket (key);
System.out.println("In Readable Option");
key.interestOps(SelectionKey.OP_WRITE);
}
it.remove();
}
}
}
protected void registerChannel (Selector selector,SelectableChannel channel, int ops)
throws Exception
{
if (channel == null) {
return;
}
channel.configureBlocking (false);
channel.register (selector, ops);
}
private ByteBuffer buffer = ByteBuffer.allocate(1024);
protected Object readDataFromSocket (SelectionKey key)throws Exception
{
DatagramChannel datagramChannel = (DatagramChannel) key.channel();
System.out.println("In Reading Method");
int count;
buffer.clear();
while ((count = datagramChannel.read (buffer)) > 0)
{
buffer.flip();
}
if (count < 0) {
datagramChannel.close();
}
return m_byteBuffer;
}
public static void main (String [] argv)
throws Exception
{
int port = 8081;
UDPServer3 udp = new UDPServer3(port);
try
{
udp.run();
}
catch (IOException e)
{
e.printStackTrace();
System.exit(-1);
}
}
}
please run this program once and tell me where iam doing fault