Java NIO

Hai guys,

I need to know about NIO package. Please give some useful url(S) for understanding basics.

[114 byte] By [ChandruMohana] at [2007-11-27 10:22:39]
# 1

http://www.telekinesis.com.au/wipv3_6/FundamentalNetworkinginJava.A21

ejpa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 2

Hai,

Thanx for your reply.

Santhiyaa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 3

Hi

Am also search this topic. I am developing Client/Server application using NIO package. Do you have any sample server code for hanlding multiple client using NIO package.

advanced thanx

Santhiyaa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 4

I use this in my servers, and it works perfectly for over 2-3 years in many servers (mostly xml socket servers for Flash multiplayer games), that sometimes have over 500 users online at same time.

I got the basic idea and skeleton for the code on these forums, 2-3 years ago ;)

package bhadzialic.crex;

/*

* Copyright 2004-2007 Borut Hadialić

Licensed under the Apache License, Version 2.0 (the "License");

you may not use this file except in compliance with the License.

You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

*/

import java.util.*;

import java.io.*;

import java.nio.channels.*;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

/**

* Factory for ASockets and AServerSockets,

*

* and the place where ASockets and AServerSockets register their

* connect/read/write/accept interests.

*

* This class and its subclases are internally synchronized are are safe to use

* from multiple concurrent threads.

*

* @author Borut Hadialić borut.hadzialic@gmail.com

*/

public class AAdapter {

protected Selector selector;

/**

* This list consists of pairs of objects: - first object is an Integer

* specifying interest registration or deregistration - second object is an

* ASocket or AServerSocket which interest should be changed

*/

protected LinkedList interestRegistrationQueue;

/**

* A collection of ASockets that are waiting to be closed. The selector

* thread will periodically (after each selector.select() unblock) process

* this list, and do the following:

*

* 1. Close the java.net.Socket that the ASocket is using. 2. Cancel the

* SelectionKey that the ASocket is using. 3. Fire a close event on the

* ASocket.

*/

protected LinkedList socketCloseQueue;

/**

* A collection of AServerSockets that are waiting to be closed. The

* selector thread will periodically (after each selector.select() unblock)

* process this list, and do the following:

*

* 1. Close the java.net.ServerSocket that the ASocket is using. 2. Cancel

* the SelectionKey that the AServerSocket is using. 3. Fire a close event

* on the AServerSocket.

*/

protected LinkedList serverSocketCloseQueue;

/**

* Selector thread contains the selector.select loop, and event handling.

*/

protected SelectorThread selectorThread;

protected boolean selectorThreadRunPermision;

protected boolean selectorThreadCloseSignal;

protected final Log logger = LogFactory.getLog(getClass());

/*

* Interest ops codes We define ints for switch-case branches, and Integers

* to fill LinkedLists collections

*/

protected final int REGISTER_READ = 1;

protected final Integer OP_REGISTER_READ = new Integer(REGISTER_READ);

protected final int DEREGISTER_READ = 2;

protected final Integer OP_DEREGISTER_READ = new Integer(DEREGISTER_READ);

protected final int REGISTER_WRITE = 4;

protected final Integer OP_REGISTER_WRITE = new Integer(REGISTER_WRITE);

protected final int DEREGISTER_WRITE = 8;

protected final Integer OP_DEREGISTER_WRITE = new Integer(DEREGISTER_WRITE);

protected final int REGISTER_CONNECT = 16;

protected final Integer OP_REGISTER_CONNECT = new Integer(REGISTER_CONNECT);

protected final int DEREGISTER_CONNECT = 32;

protected final Integer OP_DEREGISTER_CONNECT = new Integer(

DEREGISTER_CONNECT);

protected final int REGISTER_ACCEPT = 64;

protected final Integer OP_REGISTER_ACCEPT = new Integer(REGISTER_ACCEPT);

protected final int DEREGISTER_ACCEPT = 128;

protected final Integer OP_DEREGISTER_ACCEPT = new Integer(

DEREGISTER_ACCEPT);

public AAdapter() throws IOException {

logger.trace("ENTRY");

selector = Selector.open();

interestRegistrationQueue = new LinkedList();

socketCloseQueue = new LinkedList();

serverSocketCloseQueue = new LinkedList();

selectorThreadRunPermision = false;

selectorThreadCloseSignal = false;

selectorThread = new SelectorThread();

selectorThread.start();

logger.trace("RETURN");

}

/* Accessors */

public Selector getSelector() {

return selector;

}

/* end Accessors */

/* Interest registration / deregistration */

public void registerReadInterest(ASocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_REGISTER_READ);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void deregisterReadInterest(ASocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_DEREGISTER_READ);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void registerWriteInterest(ASocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_REGISTER_WRITE);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void deregisterWriteInterest(ASocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_DEREGISTER_WRITE);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void registerConnectInterest(ASocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_REGISTER_CONNECT);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void deregisterConnectInterest(ASocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_DEREGISTER_CONNECT);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void registerAcceptInterest(AServerSocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_REGISTER_ACCEPT);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void deregisterAcceptInterest(AServerSocket socket) {

logger.trace("ENTRY");

synchronized (interestRegistrationQueue) {

interestRegistrationQueue.addLast(OP_DEREGISTER_ACCEPT);

interestRegistrationQueue.addLast(socket);

}

logger.trace("RETURN");

}

/* End Interest registration / deregistration */

public void addToSocketCloseQueue(ASocket socket) {

logger.trace("ENTRY");

synchronized (socketCloseQueue) {

if (!socketCloseQueue.contains(socket))

socketCloseQueue.addLast(socket);

}

logger.trace("RETURN");

}

public void addToServerSocketCloseQueue(AServerSocket serverSocket) {

logger.trace("ENTRY");

synchronized (serverSocketCloseQueue) {

if (!serverSocketCloseQueue.contains(serverSocket))

serverSocketCloseQueue.addLast(serverSocket);

}

logger.trace("RETURN");

}

public void start() {

synchronized (selectorThread) {

selectorThreadRunPermision = true;

logger.info("Selector thread run permision granted.");

selectorThread.notify();

}

}

public void pause() {

synchronized (selectorThread) {

selectorThreadRunPermision = false;

logger.info("Selector thread run permision revoked.");

selector.wakeup();

}

}

// TODO think about how this method works

public void close() {

selectorThreadCloseSignal = true;

synchronized (selectorThread) {

if (selectorThreadRunPermision)

selector.wakeup();

else

start();

}

}

public String toString() { // Should put each line inside coresponding

// synchronized block

StringBuffer sb = new StringBuffer(256);

sb.append("AAdapter: @").append(hashCode());

sb.append("\n run permision: ").append(

Boolean.toString(selectorThreadRunPermision));

/*

* sb.append("\n read ops demand queue count: ").append(

* Integer.toString(readOpsDemandQueue.size())); sb.append("\n write ops

* demand queue count: ").append(

* Integer.toString(writeOpsDemandQueue.size())); sb.append("\n connect

* queue count: ").append( Integer.toString(connectQueue.size()));

* sb.append("\n listen queue count: ").append(

* Integer.toString(listenQueue.size()));

*/

sb.append("\n socket close queue count: ").append(

Integer.toString(socketCloseQueue.size()));

sb.append("\n server socket close queue count: ").append(

Integer.toString(serverSocketCloseQueue.size()));

return sb.toString();

}

protected class SelectorThread extends Thread {

protected SelectorThread() {

super("Selector-Thread");

}

public void run() {

logger.trace("RETURN");

while (!selectorThreadCloseSignal) {

if (selectorThreadRunPermision) {

// Do thread work;

try {

logger

.debug("Start of selector loop - blocked on selector.select().");

// Block on selector.select() until an operation becomes

// avaiable.

selector.select();

logger

.debug("Selector.select() deblocked and returned.");

// 1. PROCESS INTEREST REGISTRATION / DEREGISTRATION

boolean interestRegistrationEvents = false;

// irq - interest registration queue

Object[] irq = null;

synchronized (interestRegistrationQueue) {

if (interestRegistrationQueue.size() > 0) {

irq = (Object[]) interestRegistrationQueue

.toArray(new Object[interestRegistrationQueue

.size()]);

}

interestRegistrationQueue.clear();

}

if (irq != null) {

for (int i = 0; i < irq.length; i++) {

int interest = ((Integer) irq[i++]).intValue();

switch (interest) {

// REGISTER READ

case REGISTER_READ: {

ASocket socket = (ASocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing REGISTER_READ request, for socket "

+ socket.toString());

}

SocketChannel sc = socket.getChannel();

if (!sc.isConnected() || !sc.isOpen()) {

logger

.debug("Socket closed, or not connected. Adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

logger

.debug("Registering socket read interest with selector");

SelectionKey sk = sc.keyFor(selector);

if (sk == null) {

sk = sc.register(selector,

SelectionKey.OP_READ,

socket);

} else {

sk.interestOps(sk.interestOps()

| SelectionKey.OP_READ);

}

sk.attach(socket);

logger

.debug("Successfully registered socket read interest");

try {

// Try to read immediatelly.

socket.read();

} catch (Exception e) {

logger

.warn(

"Socket read exception (at immeadiate read attempt - the one after read ops registration)",

e);

}

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while registering socket read interest",

ioe);

}

}

break;

// DEREGISTER READ

case DEREGISTER_READ: {

ASocket socket = (ASocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing DEREGISTER_READ request, for socket "

+ socket.toString());

}

SocketChannel sc = socket.getChannel();

if (!sc.isConnected() || !sc.isOpen()) {

logger

.debug("Socket closed, or not connected. Adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

logger

.debug("Deregistering socket read interest with selector");

SelectionKey sk = sc.keyFor(selector);

if (sk == null) {

} else {

sk

.interestOps(sk

.interestOps()

& (Integer.MAX_VALUE - SelectionKey.OP_READ));

sk.attach(socket);

}

logger

.debug("Successfully deregistered socket read interest");

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while deregistering socket read interest",

ioe);

}

}

break;

case REGISTER_WRITE: {

ASocket socket = (ASocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing REGISTER_WRITE request, for socket "

+ socket.toString());

}

SocketChannel sc = socket.getChannel();

if (!sc.isConnected() || !sc.isOpen()) {

logger

.debug("Socket closed, or not connected. Adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

logger

.debug("Registering socket write interest");

SelectionKey sk = sc.keyFor(selector);

if (sk == null) {

sk = sc.register(selector,

SelectionKey.OP_WRITE,

socket);

} else {

sk.interestOps(sk.interestOps()

| SelectionKey.OP_WRITE);

}

sk.attach(socket);

logger

.debug("Successfully registered socket write interest");

try {

// Try to write immediatelly.

socket.write();

} catch (Exception e) {

logger

.warn(

"Socket write exception (at immeadiate write attempt - the one after write ops registration)",

e);

}

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while registering socket write interest",

ioe);

}

}

break;

case DEREGISTER_WRITE: {

ASocket socket = (ASocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing DEREGISTER_WRITE request, for socket "

+ socket.toString());

}

SocketChannel sc = socket.getChannel();

if (!sc.isConnected() || !sc.isOpen()) {

logger

.debug("Socket closed, or not connected. Adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

logger

.debug("Deregistering socket write interest");

SelectionKey sk = sc.keyFor(selector);

if (sk == null) {

} else {

sk

.interestOps(sk

.interestOps()

& (Integer.MAX_VALUE - SelectionKey.OP_WRITE));

sk.attach(socket);

}

logger

.debug("Successfully deregistered socket write interest");

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while deregistering socket write interest",

ioe);

}

}

break;

case REGISTER_CONNECT: {

ASocket socket = (ASocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing REGISTER_CONNECT request, for socket "

+ socket.toString());

}

SocketChannel sc = socket.getChannel();

if (!sc.isOpen()) {

logger

.debug("Socket closed, adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

logger

.debug("Registering socket connect interest");

SelectionKey sk = sc.keyFor(selector);

if (sk == null) {

sk = sc.register(selector,

SelectionKey.OP_CONNECT,

socket);

} else {

sk.interestOps(sk.interestOps()

| SelectionKey.OP_CONNECT);

}

sk.attach(socket);

logger

.debug("Successfully registered socket connect interest");

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while registering socket connect interest",

ioe);

}

}

break;

case DEREGISTER_CONNECT: {

ASocket socket = (ASocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing DEREGISTER_CONNECT request, for socket "

+ socket.toString());

}

SocketChannel sc = socket.getChannel();

if (!sc.isOpen()) {

logger

.debug("Socket closed, adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

logger

.debug("Deregistering socket connect interest");

SelectionKey sk = sc.keyFor(selector);

if (sk == null) {

} else {

sk

.interestOps(sk

.interestOps()

& (Integer.MAX_VALUE - SelectionKey.OP_CONNECT));

sk.attach(socket);

}

logger

.debug("Successfully deregistered socket connect interest");

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while deregistering socket connect interest",

ioe);

}

}

break;

// TODO Check ACCEPT logic

case REGISTER_ACCEPT: {

AServerSocket serverSocket = (AServerSocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing REGISTER_ACCEPT request, for server socket "

+ serverSocket

.toString());

}

ServerSocketChannel ssc = serverSocket

.getServerChannel();

if (!ssc.isOpen() || !serverSocket.isOpen()) {

logger

.debug("Server socket closed, adding server socket to close queue");

addToServerSocketCloseQueue(serverSocket);

selector.wakeup();

continue;

}

try {

logger

.debug("Registering server socket accept interest");

SelectionKey sk = ssc.register(

selector,

SelectionKey.OP_ACCEPT,

serverSocket);

sk.attach(serverSocket);

logger

.debug("Successfully registered server socket accept interest");

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while registering server socket accept interest",

ioe);

}

}

break;

case DEREGISTER_ACCEPT: {

AServerSocket serverSocket = (AServerSocket) irq[i];

if (logger.isDebugEnabled()) {

logger

.debug("Processing DEREGISTER_ACCEPT request, for server socket "

+ serverSocket

.toString());

}

ServerSocketChannel ssc = serverSocket

.getServerChannel();

if (!ssc.isOpen() || !serverSocket.isOpen()) {

logger

.debug("Server socket closed, adding server socket to close queue");

addToServerSocketCloseQueue(serverSocket);

selector.wakeup();

continue;

}

try {

logger

.debug("Deregistering server socket accept interest");

SelectionKey sk = ssc.keyFor(selector);

if (sk == null) {

} else {

sk

.interestOps(sk

.interestOps()

& (Integer.MAX_VALUE - SelectionKey.OP_ACCEPT));

sk.attach(serverSocket);

}

logger

.debug("Successfully deregistered server socket accept interest");

} catch (Exception ioe) {

logger

.debug(

"Exception occurred while deregistering server socket accept interest",

ioe);

}

}

break;

}

}

interestRegistrationEvents = true;

}

// 2. PROCESS OPS EVENTS

// Temporary solution for avoiding too many reads and

// too little writes

int cycleReadsMax = 10;

int cycleReads = 0;

// Process selected keys.

Iterator it = null;

it = selector.selectedKeys().iterator();

while (it.hasNext()) {

SelectionKey sk = (SelectionKey) it.next();

it.remove();

if (!interestRegistrationEvents) { // Skip ops

// events, if

// there were

// interest

// events in

// this

// selector.select()

// loop

// iteration

// CONNECTABLE

if (sk.isConnectable()) {

ASocket socket = (ASocket) sk.attachment();

logger

.debug("A socket in selectedKeys set is connectable");

sk.cancel();

SocketChannel sc = socket.getChannel();

if (!socket.getChannel().isOpen()) {

logger

.debug("Socket closed, adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

sc.finishConnect();

logger

.debug("Socket connect finished trough finishConnect()");

deregisterConnectInterest(socket);

if (socket.getReadEnabled())

registerReadInterest(socket);

if (socket.getWriteEnabled())

registerWriteInterest(socket);

selector.wakeup();

socket.fireASocketConnected();

} catch (Exception ioe) {

logger

.debug(

"Socket connect failed trough finishConnect()",

ioe);

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

continue;

}

// READABLE

if (sk.isReadable()) {

if (cycleReads < cycleReadsMax) {

ASocket socket = (ASocket) sk

.attachment();

logger

.debug("A socket in selectedKeys set is readable");

if (!socket.getChannel().isOpen()

|| !socket.getChannel()

.isConnected()) {

logger

.debug("Socket closed, or not connected. Adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

socket.read();

} catch (Exception e) {

logger.warn(

"Socket read exception", e);

}

cycleReads += 1;

}

}

// WRITABLE

if (sk.isWritable()) {

ASocket socket = (ASocket) sk.attachment();

logger

.debug("A socket in selectedKeys set is writable");

if (!socket.getChannel().isOpen()

|| !socket.getChannel()

.isConnected()) {

logger

.debug("Socket closed, or not connected. Adding socket to close queue");

addToSocketCloseQueue(socket);

selector.wakeup();

continue;

}

try {

socket.write();

} catch (Exception e) {

logger

.warn("Socket write exception",

e);

}

}

// TODO

// ACCEPTABLE

if (sk.isAcceptable()) {

AServerSocket serverSocket = (AServerSocket) sk

.attachment();

logger

.debug("A socket in selectedKeys set is acceptable");

if (!serverSocket.getServerChannel()

.isOpen()

|| !serverSocket.isOpen()) {

logger

.debug("Server socket closed, adding server socket to close queue");

addToServerSocketCloseQueue(serverSocket);

continue;

}

try {

SocketChannel socketChannel = serverSocket

.getServerChannel().accept();

socketChannel.configureBlocking(false);

serverSocket

.fireAServerSocketAccepted(socketChannel);

} catch (Exception ioe) {

logger

.debug("Exception occurred in socket accepting process");

serverSocket

.fireAServerSocketException(ioe);

continue;

}

}

} else { // Interest ops trigered loop - skip

// event ops

}

} // while(selectedKeys.hasNext())

// 3. PROCESS CLOSE QUEUES

// TODO

// Process closeQueue.

ASocket[] scq = null;

synchronized (socketCloseQueue) {

if (socketCloseQueue.size() > 0) {

scq = (ASocket[]) socketCloseQueue

.toArray(new ASocket[socketCloseQueue

.size()]);

}

socketCloseQueue.clear();

}

if (scq != null) {

for (int i = 0; i < scq.length; i++) {

ASocket socket = scq[i];

logger

.debug("Processing a socket from socketCloseQueue list");

try {

socket.close();

} catch (IOException ioe) {

logger

.debug(

"Exception occurred while closing socket",

ioe);

socket.fireASocketException(ioe);

}

}

}

// TODO

// Process serverSocketCloseQueue

AServerSocket[] sscq = null;

synchronized (serverSocketCloseQueue) {

if (serverSocketCloseQueue.size() > 0) {

sscq = (AServerSocket[]) serverSocketCloseQueue

.toArray(new AServerSocket[serverSocketCloseQueue

.size()]);

}

serverSocketCloseQueue.clear();

}

if (sscq != null) {

for (int i = 0; i < sscq.length; i++) {

AServerSocket socket = sscq[i];

logger

.debug("Processing a socket from serverSocketCloseQueue list");

try {

socket.close();

logger.debug("Server socket closed");

} catch (IOException ioe) {

logger

.debug(

"Exception occurred while closing server socket",

ioe);

socket.fireAServerSocketException(ioe);

}

}

}

// TODO

} catch (IOException ioe) {

logger

.fatal(

"Unexpected IOException from the selector thread",

ioe);

System.exit(0);

} catch (Exception e) {

logger

.fatal(

"Unexpected Exception from the selector thread",

e);

System.exit(0);

} catch (Error error) {

logger.fatal(

"Unexpected Error from the selector thread",

error);

System.exit(0);

}

} else { // Put the selector thread in itself's wait queue.

synchronized (this) {

try {

logger

.debug("Selector thread paused - waiting for run permission.");

wait();

} catch (InterruptedException ie) {

ie.printStackTrace();

}

}

}

} // end while(true);

logger.trace("RETURN");

} // end run()

}

}

boruthadzialica at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 5

I'd like to add 2 coments to code posted previously..

The comment on the class that states that the class is a factory for socket object is not correct - i forgot to remove the comment after i refactored the class.

Also, those System.exit(0) statements near the end of the code are probably a bad idea - you probably don't want your jvm to exit because some exception occured in the selector thread.

boruthadzialica at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 6

Hi,

Amazing. Thanx for your valuable reply. It should help me. I look at the code if any doubts i let u know, please clarify it.

Thanks lot..................................

Santhiyaa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 7

The code is very complicated to understand me. I am only in basic stage.

Do you have any simple code for file transfer using NIO package.

advanced thanx

Santhiyaa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 8

while (true) {

try {

selector.select();

// Process selected keys.

Iterator it = null;

it = selector.selectedKeys().iterator();

while (it.hasNext()) {

SelectionKey sk = (SelectionKey) it.next();

it.remove();

// READABLE

if (sk.isReadable()) {

SocketChannel socket = (SocketChannel) sk.attachment();

//do some reading

//socket.read();

}

// WRITABLE

if (sk.isWritable()) {

SocketChannel socket = (SocketChannel) sk.attachment();

//do some writing

//socket.write();

//If you are done writing, deregister write interest.

}

// ACCEPTABLE

if (sk.isAcceptable()) {

ServerSocketChannel serverSocket = (ServerSocketChannel) sk.attachment();

//accept

SocketChannel socketChannel = serverSocket

.getServerChannel().accept();

socketChannel.configureBlocking(false);

//Do something with the new socketChannel

}

}

} catch(Exception e) {

e.printStackTrace();

}

}

In the previous code you have examples how to register and deregister connect/accept/read/write interest on a SocketChannel

boruthadzialica at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 9

Hi,

Thanks for posting the code. Is the server code handles multiple clients request at the same time ?

advanced thanx lot.

Santhiyaa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 10

> Hi,

> Thanks for posting the code. Is the server code

> handles multiple clients request at the same time ?

>

> advanced thanx lot.

I just posted some code that can give you some ideas about how to use nio to build your own server. It's not a complete code of a server.

boruthadzialica at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 11

> socketCloseQueue = new LinkedList();

> serverSocketCloseQueue = new LinkedList();

You don't need these queues or the associated APIs at all. Just close the associated channels. Channel closes are already queued inside the selector.

>// TODO think about how this method works

>public void close() {

>selectorThreadCloseSignal = true;

>synchronized (selectorThread) {

>if (selectorThreadRunPermision)

>selector.wakeup();

>else

>start();

Why on earth would a close method restart the selector?

> selector.select();

You should always use a timeout here, and do idle-processing if the timeout happens, i.e. if the number of selected channels is zero.

> selector.wakeup();

All the Selector.wakeup() calls in this event loop are pointless, as the Selector is not currently selecting.

> selector.wakeup();

Ditto

> interestOps() & (Integer.MAX_VALUE - SelectionKey.OP_READ));

interestOps() & ~SelectionKey.OP_READ

> selector.wakeup();

See above.

> selector.wakeup();

Ditto

> interestOps()& (Integer.MAX_VALUE - SelectionKey.OP_WRITE));

See above

> selector.wakeup();

See above.

> selector.wakeup();

See above.

> interestOps() & (Integer.MAX_VALUE - SelectionKey.OP_CONNECT));

See above.

> selector.wakeup();

See above.

> selector.wakeup();

Ditto.

> interestOps() & (Integer.MAX_VALUE - SelectionKey.OP_ACCEPT));

See above.

> // Temporary solution for avoiding too many reads and

>// too little writes

The correct solution to this is to never register a channel for OP_READ and OP_WRITE at the same time. If you have something to write to the channel, write it all before you start reading the channel again. The other part of the solution is to process OP_WRITE before OP_READ.

>// Process selected keys.

Doing the operations in this sequence causes a lot of unnecessary problems, in fact it probably motivates all the queues. You should process the queueus before calling select(), not after it.

>SelectionKey sk = (SelectionKey) it.next();

>it.remove();

At this point you should do:

if (!ks.isValid())

continue;

> if (!interestRegistrationEvents) { // Skip

See above. Moving the queue processing before the select() call will eliminate the need for this test.

> if (sk.isConnectable()) {

> ASocket socket =

> (ASocket) sk.attachment();

> logger

> .debug("A socket in selectedKeys set is

> connectable");

>sk.cancel();

What's this for? You should be registering this channel for OP_WRITE. But if this is a server why will it have outbound connections at all?

> SocketChannel sc = socket.getChannel();

> if (!socket.getChannel().isOpen()) {

This condition is impossible.

> selector.wakeup();

See above.

>sc.finishConnect();

At this point you should test to see whether this returns 'true' and react accordingly.

> .debug("Socket connect finished trough finishConnect()");

Not necessarily! see above.

> deregisterConnectInterest(socket);

You've already cancelled the selection key, why do this too? and as you shouldn't have cancelled the key, why not just call interestOps() inline here?

> if (socket.getReadEnabled())

>

> registerReadInterest(socket);

> if

> (socket.getWriteEnabled())

>

> registerWriteInterest(socket);

See above. Don't do both these things at the same time. And again, why not just call interestOps() inline and avoid all the inefficiencies of queueing this operation?

> selector.wakeup();

See above.

> selector.wakeup();

See above.

> if (sk.isReadable()) {

// ...

> if (!socket.getChannel().isOpen()

> ||

> !socket.getChannel()

>

> isConnected()) {

Neither of these conditions is possible if sk.isReadable() is true, unless you have other threads closing the socket, which you shouldn't have. And a socket which has ever been connected always returns true from isConnected() so this test is doubly pointless.

> selector.wakeup();

See above.

> if (sk.isWritable()) {

// ...

> if (!socket.getChannel().isOpen()

> ||

> !socket.getChannel()

>

> isConnected()) {

See above. Both tests are pointless.

> selector.wakeup();

See above. Pointless.

>if (sk.isAcceptable()) {

// ...

> if

> (!serverSocket.getServerChannel()

>

> isOpen()

> ||

> !serverSocket.isOpen()) {

See above. Both tests are pointless.

>SocketChannel socketChannel = serverSocket.getServerChannel().accept();

At this point you should test for null.

> // 3. PROCESS CLOSE QUEUES

Pointless. Just close the channels when you need to. Closing is already queued inside the selector.

Basically you can eliminate about 3/4 of this code.

ejpa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 12

> The code is very complicated to understand me. I am

> only in basic stage.

that's because it's a complete mess.

Anything with such hideous nested and branching conditional statements is fundamentally flawed.

jwentinga at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 13

any one can give the simple/sample client server application developed by NIO Package.

Thanks in advance

Santhiyaa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...
# 14

Read right through this thread:

http://forum.java.sun.com/thread.jspa?threadID=459338

Lots of good advice there.

ejpa at 2007-7-28 17:17:19 > top of Java-index,Core,Core APIs...