Multithreaded NIO Server crash, Client Connection reset

I am coding a NIO based server and plain socket blocking client setup. We need to cater to 40K client connections. The server setup is such that there is one main selector thread which accepts client connections, which inturn launches separate Selector threads for every 1000 connections for read and write operations. Currently I have registered only OP_READ operation for the channel with the selectors. The server program is able to accept connetions till 20K after that when I try to increase the client count in increments of 2000, midway (after 24K) while establising the connections, the server program crashes and ends and there is no exception or error at the server side. At the client side I get the below exception. I am not setting any SO_LINGER or SO_TIMEOUT parameters. At the server the heap parameters that I use is --> java -Xms2048m -Xmx2048m -Xss128k and is being run using Solaris 8. Any tips/suggestions, please help..

java.net.SocketException: Connection reset

at java.net.SocketInputStream.read(SocketInputStream.java:168)

at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)

at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)

at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)

at java.io.InputStreamReader.read(InputStreamReader.java:167)

at java.io.BufferedReader.fill(BufferedReader.java:136)

at java.io.BufferedReader.read1(BufferedReader.java:187)

at java.io.BufferedReader.read(BufferedReader.java:261)

at java.io.Reader.read(Reader.java:100)

at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)

at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)

at java.lang.Thread.run(Thread.java:534)

java.net.SocketException: Connection reset

at java.net.SocketInputStream.read(SocketInputStream.java:168)

at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)

at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)

at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)

at java.io.InputStreamReader.read(InputStreamReader.java:167)

at java.io.BufferedReader.fill(BufferedReader.java:136)

at java.io.BufferedReader.read1(BufferedReader.java:187)

at java.io.BufferedReader.read(BufferedReader.java:261)

at java.io.Reader.read(Reader.java:100)

at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)

at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)

at java.lang.Thread.run(Thread.java:534)

java.net.SocketException: Connection reset

at java.net.SocketInputStream.read(SocketInputStream.java:168)

at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)

at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)

at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)

at java.io.InputStreamReader.read(InputStreamReader.java:167)

at java.io.BufferedReader.fill(BufferedReader.java:136)

at java.io.BufferedReader.read1(BufferedReader.java:187)

at java.io.BufferedReader.read(BufferedReader.java:261)

at java.io.Reader.read(Reader.java:100)

at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)

at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)

at java.lang.Thread.run(Thread.java:534)

java.net.SocketException: Connection reset

at java.net.SocketInputStream.read(SocketInputStream.java:168)

at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)

at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)

at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)

at java.io.InputStreamReader.read(InputStreamReader.java:167)

at java.io.BufferedReader.fill(BufferedReader.java:136)

at java.io.BufferedReader.read1(BufferedReader.java:187)

at java.io.BufferedReader.read(BufferedReader.java:261)

at java.io.Reader.read(Reader.java:100)

at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)

at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)

at java.lang.Thread.run(Thread.java:534)

java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:305)

at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:171)

at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:158)

at java.net.Socket.connect(Socket.java:461)

at SampleBlockingClient.run(SampleBlockingClient.java:52)

at java.lang.Thread.run(Thread.java:534)

java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:305)

at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:171)

at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:158)

at java.net.Socket.connect(Socket.java:461)

at SampleBlockingClient.run(SampleBlockingClient.java:52)

at java.lang.Thread.run(Thread.java:534)

java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:305)

at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:171)

at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:158)

at java.net.Socket.connect(Socket.java:461)

at SampleBlockingClient.run(SampleBlockingClient.java:52)

at java.lang.Thread.run(Thread.java:534)

[5803 byte] By [shannaraa] at [2007-11-26 19:19:56]
# 1

I just thought I will give some code snippets here..Basically I have 4 classes at the server.. and the Client has 3 classes, launch a new thread for every client. The client is a blocking one. The server is running in a differnt machine and the client in a different machine I am pasting the entire code for your convenience.Please let me know if there is any problem in the approach to cater to so many thousands of clients. I incrementally add clients by running the new client for every 5000 connecitons

-

First One

public class SelectorTestServer {

private static int clientKCount = 70;

private Vector readSelectorVec = new Vector();

private Vector writeSelectorVec = new Vector();

private ServerSocketChannel ssch = null;

private SelectorThread readThread = null;

private WriteThread writeThread = null;

protected void startListening () {

try{

Selector readSelector = null;

Selector writeSelector = null;

ssch = ServerSocketChannel.open();

ssch.configureBlocking(false);

InetSocketAddress isa = new InetSocketAddress("10.148.13.12", 5555);

ssch.socket().bind(isa);

for(int i=0;i<clientKCount;i++)

{

readSelector = SelectorProvider.provider().openSelector();

writeSelector = SelectorProvider.provider().openSelector();

ssch.register(readSelector,SelectionKey.OP_ACCEPT);

ssch.register(writeSelector,SelectionKey.OP_ACCEPT);

readSelectorVec.add(readSelector);

writeSelectorVec.add(writeSelector);

}

}

catch (Exception e){

System.out.println("Caught here 1");

e.printStackTrace();

}

catch (Throwable t)

{

System.out.println("Caught here 2");

t.printStackTrace();

}

System.out.println("-");

System.out.println("Launching from first Set of Selector and ReadWrite Threads");

System.out.println("-");

this.writeThread = new WriteThread((Selector)writeSelectorVec.elementAt(0), 1);

this.writeThread.setDaemon(true);

this.writeThread.start();

this.readThread = new SelectorThread(readSelectorVec, writeSelectorVec);

this.readThread.setDaemon(true);

this.readThread.start();

}

public static void main (String argv[]){

SelectorTestServer s = new SelectorTestServer();

s.startListening();

try{

Thread.currentThread().sleep(500000);

}

catch (Exception e){

System.err.println(e.toString());

}

catch(Throwable t){

System.out.println("Caught 3");

t.printStackTrace();

}

}

--

Second

--

public class SelectorThread extends Thread {

private int clientConnCount = 1;

private Selector readSelector = null;

private Selector writeSelector = null;

private Vector readSelectorVector = new Vector();

private Vector writeSelectorVector = new Vector();

public SelectorThread(Vector readSelec, Vector writeSelec) {

super("SelectorThread");

this.readSelectorVector = readSelec;

this.writeSelectorVector = writeSelec;

this.readSelector = (Selector)this.readSelectorVector.elementAt(0);

this.writeSelector = (Selector)this.writeSelectorVector.elementAt(0);

}

public void run () {

boolean running = true;

try{

while(readSelector.select() > 0){

Set readyKeys = readSelector.selectedKeys();

Iterator i = readyKeys.iterator();

while (i.hasNext()){

SelectionKey sk = (SelectionKey) i.next();

i.remove();

if (sk.isAcceptable()){

ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();

SocketChannel channel = nextReady.accept();

channel.configureBlocking(false);

if(clientConnCount % 200 ==0)

System.out.println("Total number of clients connected " + clientConnCount);

//if(clientConnCount>16000)

//System.out.println("After 16K :: " + clientConnCount);

clientConnCount++;

int index = clientConnCount/1000;

if(index==0 || clientConnCount==1000){

channel.register(readSelector, SelectionKey.OP_READ);

//writeSelector.wakeup();

//channel.register(writeSelector, SelectionKey.OP_WRITE);

}

else{

if(clientConnCount==((index*1000) +1)){

System.out.println("-");

System.out.println("Launching from " + (index+1) + " Set of Selector and ReadWrite Threads");

System.out.println("-");

Selector readSel = (Selector)readSelectorVector.elementAt(index);

Thread tRead = new Thread(new ExclusiveReaderThread(readSel));

tRead.setDaemon(true);

tRead.start();

Selector writeSel = (Selector)writeSelectorVector.elementAt(index);

Thread tWrite = new Thread(new WriteThread(writeSel,index+1));

tWrite.setDaemon(true);

tWrite.start();

}

channel.register((Selector)readSelectorVector.elementAt(index), SelectionKey.OP_READ);

//((Selector)writeSelectorVector.elementAt(index)).wakeup();

//channel.register((Selector)writeSelectorVector.elementAt(index), SelectionKey.OP_WRITE);

}

}

else if (sk.isReadable()){

//Read

}

}

}

}

catch (Exception ex){

System.out.println("Exception in selector loop: "+ex.toString());

ex.printStackTrace();

running = false;

}

catch (Throwable t)

{

System.out.println("Caught here 4");

t.printStackTrace();

}

}

}

Third

-

public class ExclusiveReaderThread extends Thread {

private Selector readSel = null;

public ExclusiveReaderThread(Selector readSelect)

{

super("Exclusive Read");

this.readSel = readSelect;

}

public void run()

{

try{

while(readSel.select() > 0){

Set readyKeys = readSel.selectedKeys();

Iterator i = readyKeys.iterator();

while (i.hasNext()){

SelectionKey sk = (SelectionKey) i.next();

i.remove();

if (sk.isReadable()){

//Read

}

}

}

} catch (IOException e) {

System.out.println("Caught 7");

e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.

}

catch (Exception e){

System.out.println("Caught 6");

e.printStackTrace();

}

catch(Throwable t){

System.out.println("Caught 5");

t.printStackTrace();

}

}

}

--

Fourth

--

public class WriteThread extends Thread {

private static long lastWriteTimeStamp=0;

private static long count=1;

private static int writeThreadId;

private Selector writeSelector = null;

public WriteThread(Selector writeSelector, int writeThreadId) {

super("WriteThread");

this.writeSelector = writeSelector;

this.writeThreadId = writeThreadId;

}

public void run () {

boolean running = true;

try{

while(writeSelector.select() > 0){

Set readyKeys = writeSelector.selectedKeys();

Iterator i = readyKeys.iterator();

while (i.hasNext()){

SelectionKey sk = (SelectionKey) i.next();

i.remove();

if (sk.isWritable()){

// if(lastWriteTimeStamp==0 || (System.currentTimeMillis()-lastWriteTimeStamp)>5000)

// {

//SocketChannel client = (SocketChannel) sk.channel();

//String serverMessage = getMessageForClient();

//client.write(ByteBuffer.wrap(serverMessage.getBytes()));

//lastWriteTimeStamp = System.currentTimeMillis();

// }

}

}

}

}

catch (Exception ex){

System.out.println("Exception in write selector loop: "+ex.toString());

ex.printStackTrace();

running = false;

}

catch(Throwable t)

{

System.out.println("Caught 8");

t.printStackTrace();

}

}

private static String getMessageForClient()

{

String msg = "Message " + count + " from NIOServer Write Thread " + writeThreadId;

count++;

return msg;

}//end run()

}

Client - first one

--

public class SampleBlockingClient implements Runnable

{

Socket clientSocket=null;

PrintWriter out=null;

BufferedReader in=null;

BufferedReader stdin=null;

String serverText = null;

String clientInput = null;

public static final int BUFFER_SIZE=5000;

private long count=1;

String clientId =null;

public SampleBlockingClient(String clientId)

{

this.clientId = clientId;

}

//ClientIdGenerator cig = ClientIdGenerator.getInstance();

//int clientId = cig.getClientId();

//public static void main(String[] args)

//{

////String clientId = args[0];

//String clientId = "1";

//System.out.println("The clientId is " + clientId);

//new SampleBlockingClient().startClientChat(clientId);

//}

public void run()

{

try

{

//clientSocket = new Socket("10.148.13.12",5555);

Socket clientSocket = new Socket();

InetSocketAddress isa = new InetSocketAddress("10.148.13.12",5555);

clientSocket.connect(isa,0);

//System.out.println("-");

//System.out.println("Connected to NIOServer Client " + clientId);

//System.out.println("-");

in=new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));

out=new PrintWriter(clientSocket.getOutputStream(),true);

Thread thread = new Thread(new BlockingClientReaderThread(in));

thread.start();

//stdin = new BufferedReader(new InputStreamReader(System.in));

//while ((clientInput = stdin.readLine()) != null) {

while(true){

clientInput=getMessage(clientId);

//System.out.print("Me:: " + clientInput);

//out.println(clientInput);

//char[] buf = new char[BUFFER_SIZE];

//int num = in.read(buf);

//

//System.out.println("");

//System.out.print("NIOServer :: ");

//System.out.print(getStringValue(buf,num));

//

////System.out.println();

////System.out.print("Me:: ");

//System.out.println("");

Thread.sleep(2000);

}

}

catch (IOException e)

{

System.out.println("Caught In Exception");

e.printStackTrace();

try {

BufferedWriter out = new BufferedWriter(new FileWriter("/apps/opt/igo3/testing/Err_" +getRandomNumber() + ".txt" ));

out.write(e.toString());

out.write("-Caught in IOException-");

out.write("--");

out.write(e.getMessage());

out.close();

} catch (IOException ex) {

System.out.println("Caught In IO Exception wile writing file");

ex.printStackTrace();

}

}

//catch (InterruptedException e) {

//System.out.println("Caught In InterruptedException");

//e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.

//}

catch (Throwable t)

{

System.out.println("Caught In Throwable Section");

t.printStackTrace() ;

try {

BufferedWriter out = new BufferedWriter(new FileWriter("/apps/opt/igo3/testing/Err_" +getRandomNumber() + ".txt" ));

out.write(t.toString());

out.write("Caught in Throwable--");

out.write("--");

out.write(t.getMessage());

out.close();

} catch (IOException ex) {

System.out.println("Caught In IO Exception wile writing file");

ex.printStackTrace();

}

}

finally

{

try

{

if(in!=null)

in.close();

if(out!=null)

out.close();

if(stdin!=null)

stdin.close();

if(clientSocket!=null)

clientSocket.close();

}

catch (IOException e)

{

System.out.println("Caught In Exception In Finally Block");

e.printStackTrace();

}

}

}

private String getMessage(String clientId)

{

String msg = "Message " + count + " from Client " + clientId ;

count++;

return msg;

}

private String getStringValue(char[]buf, int num)

{

StringBuffer sb = new StringBuffer();

for(int i=0;i<num;i++)

sb.append(buf[i]);

return sb.toString();

}

private static int getRandomNumber(){

Random random = new Random();

int number = random.nextInt();

if(number<0)

number=-number;

return number;

}

}

Client - second one

public class BlockingClientReaderThread implements Runnable {

private BufferedReader in;

public static final int BUFFER_SIZE=1000;

public BlockingClientReaderThread(BufferedReader in){

this.in=in;

}

public void run(){

this.readAtClient();

}

private void readAtClient()

{

try

{

while(true)

{

char[] buf = new char[BUFFER_SIZE];

int num = in.read(buf);

System.out.println("");

System.out.print("Server :: ");

System.out.print(getStringValue(buf,num));

System.out.println("");

}

}

//catch (IOException e) {

//System.out.println("Here in exception");

//}

catch(Exception e)

{

e.printStackTrace();

}

}

private String getStringValue(char[]buf, int num)

{

StringBuffer sb = new StringBuffer();

for(int i=0;i<num;i++)

sb.append(buf[i]);

return sb.toString();

}

}

Client main program - which launches clients

public class ClientThreadGenerator {

public static void main(String[] args) {

try{

String clientCount = args[0].trim();

System.out.println("client Count is -" + clientCount + "-");

int count = Integer.parseInt(clientCount);

for(int i=1;i<=count;i++){

Thread th = new Thread(new SampleBlockingClient(Integer.toString(i)));

th.start();

}

System.out.println("################################################");

System.out.println("Launched all " + count + "Threads ");

System.out.println("################################################");

}

catch (Throwable t)

{

System.out.println("Caught In Throwable");

t.printStackTrace() ;

}

}

}

shannaraa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 2

(a) You need to do better than print 'Caught 7' if you get an IOException. Print the actual exception message. Are you getting any of these?

(b) You need to show us the reading and writing code.

(c) Don't register a channel for OP_WRITE unless you actually have something to write to that channel. OP_WRITE is almost always 'on' so doing that just causes the selector to spin.

(d) You mentioned a server crash but provided no details, just cllient-side exception traces. What exactly do you mean by 'crash'?

ejpa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 3

Hi

Thanks for you reply. My answers inline

(a) You need to do better than print 'Caught 7' if you get an IOException. Print the actual exception message. Are you getting any of these?

[ans] It does not go into this exception block at all. Infact I dont get any exception at the server side. In the solaris console window, the process junst ends and the prompt is back

(b) You need to show us the reading and writing code.

[ans] I have put the reading and writing code also. ExclusiveReaderThread class is the reading code, and WriterThread class is the writing part. But I am not registering the channel for OP_WRITE at all. Just reading..Infact I am not sending data from the client also. I am just trying to load test the server and check if it supports so many users by simply accepting connections and registering for read operation (though I am not explicitly sending data from client).

(c) Don't register a channel for OP_WRITE unless you actually have something to write to that channel. OP_WRITE is almost always 'on' so doing that just causes the selector to spin.

[ans] I have not done that.

(d) You mentioned a server crash but provided no details, just cllient-side exception traces. What exactly do you mean by 'crash'?

[ans] I dont have any exceptions at the server side. In the solaris console window, the process junst ends and the prompt is back

shannaraa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 4
Does the JVM have an exit code? $status? is there a core dump?
ejpa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 5
Actually I was searching for that inthe current directory from where I executed the server. But I could not find one generatedWhere else can I find that? Any idea?
shannaraa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 6
If it isn't there it won't be anywhere. But echo $status when the JVM exits.
ejpa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 7

Does this mystical magical exit of the server happen after about 500 seconds of runtime?

> public static void main (String argv[]){

> SelectorTestServer s = new SelectorTestServer();

>s.startListening();

>try{

>Thread.currentThread().sleep(500000);

>}

>catch (Exception e){

>System.err.println(e.toString());

>}

>catch(Throwable t){

> System.out.println("Caught 3");

> t.printStackTrace();

>}

>}

(also see related Thread.setDaemon(true) calls elsewhere in the code.)

sjasjaa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...
# 8
I found out the issue..The main thread was exitting and so the server ended. Thanks a lot for all your people's help
shannaraa at 2007-7-9 21:37:22 > top of Java-index,Core,Core APIs...