Multithreaded NIO Server crash, Client Connection reset
I am coding a NIO based server and plain socket blocking client setup. We need to cater to 40K client connections. The server setup is such that there is one main selector thread which accepts client connections, which inturn launches separate Selector threads for every 1000 connections for read and write operations. Currently I have registered only OP_READ operation for the channel with the selectors. The server program is able to accept connetions till 20K after that when I try to increase the client count in increments of 2000, midway (after 24K) while establising the connections, the server program crashes and ends and there is no exception or error at the server side. At the client side I get the below exception. I am not setting any SO_LINGER or SO_TIMEOUT parameters. At the server the heap parameters that I use is --> java -Xms2048m -Xmx2048m -Xss128k and is being run using Solaris 8. Any tips/suggestions, please help..
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:168)
at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)
at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.read1(BufferedReader.java:187)
at java.io.BufferedReader.read(BufferedReader.java:261)
at java.io.Reader.read(Reader.java:100)
at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)
at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)
at java.lang.Thread.run(Thread.java:534)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:168)
at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)
at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.read1(BufferedReader.java:187)
at java.io.BufferedReader.read(BufferedReader.java:261)
at java.io.Reader.read(Reader.java:100)
at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)
at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)
at java.lang.Thread.run(Thread.java:534)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:168)
at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)
at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.read1(BufferedReader.java:187)
at java.io.BufferedReader.read(BufferedReader.java:261)
at java.io.Reader.read(Reader.java:100)
at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)
at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)
at java.lang.Thread.run(Thread.java:534)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:168)
at sun.nio.cs.StreamDecoder$CharsetSD.readBytes(StreamDecoder.java:408)
at sun.nio.cs.StreamDecoder$CharsetSD.implRead(StreamDecoder.java:450)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:182)
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.read1(BufferedReader.java:187)
at java.io.BufferedReader.read(BufferedReader.java:261)
at java.io.Reader.read(Reader.java:100)
at BlockingClientReaderThread.readAtClient(BlockingClientReaderThread.java:25)
at BlockingClientReaderThread.run(BlockingClientReaderThread.java:15)
at java.lang.Thread.run(Thread.java:534)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:305)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:171)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:158)
at java.net.Socket.connect(Socket.java:461)
at SampleBlockingClient.run(SampleBlockingClient.java:52)
at java.lang.Thread.run(Thread.java:534)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:305)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:171)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:158)
at java.net.Socket.connect(Socket.java:461)
at SampleBlockingClient.run(SampleBlockingClient.java:52)
at java.lang.Thread.run(Thread.java:534)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:305)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:171)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:158)
at java.net.Socket.connect(Socket.java:461)
at SampleBlockingClient.run(SampleBlockingClient.java:52)
at java.lang.Thread.run(Thread.java:534)
[5803 byte] By [
shannaraa] at [2007-11-26 19:19:56]

# 1
I just thought I will give some code snippets here..Basically I have 4 classes at the server.. and the Client has 3 classes, launch a new thread for every client. The client is a blocking one. The server is running in a differnt machine and the client in a different machine I am pasting the entire code for your convenience.Please let me know if there is any problem in the approach to cater to so many thousands of clients. I incrementally add clients by running the new client for every 5000 connecitons
-
First One
public class SelectorTestServer {
private static int clientKCount = 70;
private Vector readSelectorVec = new Vector();
private Vector writeSelectorVec = new Vector();
private ServerSocketChannel ssch = null;
private SelectorThread readThread = null;
private WriteThread writeThread = null;
protected void startListening () {
try{
Selector readSelector = null;
Selector writeSelector = null;
ssch = ServerSocketChannel.open();
ssch.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress("10.148.13.12", 5555);
ssch.socket().bind(isa);
for(int i=0;i<clientKCount;i++)
{
readSelector = SelectorProvider.provider().openSelector();
writeSelector = SelectorProvider.provider().openSelector();
ssch.register(readSelector,SelectionKey.OP_ACCEPT);
ssch.register(writeSelector,SelectionKey.OP_ACCEPT);
readSelectorVec.add(readSelector);
writeSelectorVec.add(writeSelector);
}
}
catch (Exception e){
System.out.println("Caught here 1");
e.printStackTrace();
}
catch (Throwable t)
{
System.out.println("Caught here 2");
t.printStackTrace();
}
System.out.println("-");
System.out.println("Launching from first Set of Selector and ReadWrite Threads");
System.out.println("-");
this.writeThread = new WriteThread((Selector)writeSelectorVec.elementAt(0), 1);
this.writeThread.setDaemon(true);
this.writeThread.start();
this.readThread = new SelectorThread(readSelectorVec, writeSelectorVec);
this.readThread.setDaemon(true);
this.readThread.start();
}
public static void main (String argv[]){
SelectorTestServer s = new SelectorTestServer();
s.startListening();
try{
Thread.currentThread().sleep(500000);
}
catch (Exception e){
System.err.println(e.toString());
}
catch(Throwable t){
System.out.println("Caught 3");
t.printStackTrace();
}
}
--
Second
--
public class SelectorThread extends Thread {
private int clientConnCount = 1;
private Selector readSelector = null;
private Selector writeSelector = null;
private Vector readSelectorVector = new Vector();
private Vector writeSelectorVector = new Vector();
public SelectorThread(Vector readSelec, Vector writeSelec) {
super("SelectorThread");
this.readSelectorVector = readSelec;
this.writeSelectorVector = writeSelec;
this.readSelector = (Selector)this.readSelectorVector.elementAt(0);
this.writeSelector = (Selector)this.writeSelectorVector.elementAt(0);
}
public void run () {
boolean running = true;
try{
while(readSelector.select() > 0){
Set readyKeys = readSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isAcceptable()){
ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();
SocketChannel channel = nextReady.accept();
channel.configureBlocking(false);
if(clientConnCount % 200 ==0)
System.out.println("Total number of clients connected " + clientConnCount);
//if(clientConnCount>16000)
//System.out.println("After 16K :: " + clientConnCount);
clientConnCount++;
int index = clientConnCount/1000;
if(index==0 || clientConnCount==1000){
channel.register(readSelector, SelectionKey.OP_READ);
//writeSelector.wakeup();
//channel.register(writeSelector, SelectionKey.OP_WRITE);
}
else{
if(clientConnCount==((index*1000) +1)){
System.out.println("-");
System.out.println("Launching from " + (index+1) + " Set of Selector and ReadWrite Threads");
System.out.println("-");
Selector readSel = (Selector)readSelectorVector.elementAt(index);
Thread tRead = new Thread(new ExclusiveReaderThread(readSel));
tRead.setDaemon(true);
tRead.start();
Selector writeSel = (Selector)writeSelectorVector.elementAt(index);
Thread tWrite = new Thread(new WriteThread(writeSel,index+1));
tWrite.setDaemon(true);
tWrite.start();
}
channel.register((Selector)readSelectorVector.elementAt(index), SelectionKey.OP_READ);
//((Selector)writeSelectorVector.elementAt(index)).wakeup();
//channel.register((Selector)writeSelectorVector.elementAt(index), SelectionKey.OP_WRITE);
}
}
else if (sk.isReadable()){
//Read
}
}
}
}
catch (Exception ex){
System.out.println("Exception in selector loop: "+ex.toString());
ex.printStackTrace();
running = false;
}
catch (Throwable t)
{
System.out.println("Caught here 4");
t.printStackTrace();
}
}
}
Third
-
public class ExclusiveReaderThread extends Thread {
private Selector readSel = null;
public ExclusiveReaderThread(Selector readSelect)
{
super("Exclusive Read");
this.readSel = readSelect;
}
public void run()
{
try{
while(readSel.select() > 0){
Set readyKeys = readSel.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isReadable()){
//Read
}
}
}
} catch (IOException e) {
System.out.println("Caught 7");
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
catch (Exception e){
System.out.println("Caught 6");
e.printStackTrace();
}
catch(Throwable t){
System.out.println("Caught 5");
t.printStackTrace();
}
}
}
--
Fourth
--
public class WriteThread extends Thread {
private static long lastWriteTimeStamp=0;
private static long count=1;
private static int writeThreadId;
private Selector writeSelector = null;
public WriteThread(Selector writeSelector, int writeThreadId) {
super("WriteThread");
this.writeSelector = writeSelector;
this.writeThreadId = writeThreadId;
}
public void run () {
boolean running = true;
try{
while(writeSelector.select() > 0){
Set readyKeys = writeSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isWritable()){
// if(lastWriteTimeStamp==0 || (System.currentTimeMillis()-lastWriteTimeStamp)>5000)
// {
//SocketChannel client = (SocketChannel) sk.channel();
//String serverMessage = getMessageForClient();
//client.write(ByteBuffer.wrap(serverMessage.getBytes()));
//lastWriteTimeStamp = System.currentTimeMillis();
// }
}
}
}
}
catch (Exception ex){
System.out.println("Exception in write selector loop: "+ex.toString());
ex.printStackTrace();
running = false;
}
catch(Throwable t)
{
System.out.println("Caught 8");
t.printStackTrace();
}
}
private static String getMessageForClient()
{
String msg = "Message " + count + " from NIOServer Write Thread " + writeThreadId;
count++;
return msg;
}//end run()
}
Client - first one
--
public class SampleBlockingClient implements Runnable
{
Socket clientSocket=null;
PrintWriter out=null;
BufferedReader in=null;
BufferedReader stdin=null;
String serverText = null;
String clientInput = null;
public static final int BUFFER_SIZE=5000;
private long count=1;
String clientId =null;
public SampleBlockingClient(String clientId)
{
this.clientId = clientId;
}
//ClientIdGenerator cig = ClientIdGenerator.getInstance();
//int clientId = cig.getClientId();
//public static void main(String[] args)
//{
////String clientId = args[0];
//String clientId = "1";
//System.out.println("The clientId is " + clientId);
//new SampleBlockingClient().startClientChat(clientId);
//}
public void run()
{
try
{
//clientSocket = new Socket("10.148.13.12",5555);
Socket clientSocket = new Socket();
InetSocketAddress isa = new InetSocketAddress("10.148.13.12",5555);
clientSocket.connect(isa,0);
//System.out.println("-");
//System.out.println("Connected to NIOServer Client " + clientId);
//System.out.println("-");
in=new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
out=new PrintWriter(clientSocket.getOutputStream(),true);
Thread thread = new Thread(new BlockingClientReaderThread(in));
thread.start();
//stdin = new BufferedReader(new InputStreamReader(System.in));
//while ((clientInput = stdin.readLine()) != null) {
while(true){
clientInput=getMessage(clientId);
//System.out.print("Me:: " + clientInput);
//out.println(clientInput);
//char[] buf = new char[BUFFER_SIZE];
//int num = in.read(buf);
//
//System.out.println("");
//System.out.print("NIOServer :: ");
//System.out.print(getStringValue(buf,num));
//
////System.out.println();
////System.out.print("Me:: ");
//System.out.println("");
Thread.sleep(2000);
}
}
catch (IOException e)
{
System.out.println("Caught In Exception");
e.printStackTrace();
try {
BufferedWriter out = new BufferedWriter(new FileWriter("/apps/opt/igo3/testing/Err_" +getRandomNumber() + ".txt" ));
out.write(e.toString());
out.write("-Caught in IOException-");
out.write("--");
out.write(e.getMessage());
out.close();
} catch (IOException ex) {
System.out.println("Caught In IO Exception wile writing file");
ex.printStackTrace();
}
}
//catch (InterruptedException e) {
//System.out.println("Caught In InterruptedException");
//e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
//}
catch (Throwable t)
{
System.out.println("Caught In Throwable Section");
t.printStackTrace() ;
try {
BufferedWriter out = new BufferedWriter(new FileWriter("/apps/opt/igo3/testing/Err_" +getRandomNumber() + ".txt" ));
out.write(t.toString());
out.write("Caught in Throwable--");
out.write("--");
out.write(t.getMessage());
out.close();
} catch (IOException ex) {
System.out.println("Caught In IO Exception wile writing file");
ex.printStackTrace();
}
}
finally
{
try
{
if(in!=null)
in.close();
if(out!=null)
out.close();
if(stdin!=null)
stdin.close();
if(clientSocket!=null)
clientSocket.close();
}
catch (IOException e)
{
System.out.println("Caught In Exception In Finally Block");
e.printStackTrace();
}
}
}
private String getMessage(String clientId)
{
String msg = "Message " + count + " from Client " + clientId ;
count++;
return msg;
}
private String getStringValue(char[]buf, int num)
{
StringBuffer sb = new StringBuffer();
for(int i=0;i<num;i++)
sb.append(buf[i]);
return sb.toString();
}
private static int getRandomNumber(){
Random random = new Random();
int number = random.nextInt();
if(number<0)
number=-number;
return number;
}
}
Client - second one
public class BlockingClientReaderThread implements Runnable {
private BufferedReader in;
public static final int BUFFER_SIZE=1000;
public BlockingClientReaderThread(BufferedReader in){
this.in=in;
}
public void run(){
this.readAtClient();
}
private void readAtClient()
{
try
{
while(true)
{
char[] buf = new char[BUFFER_SIZE];
int num = in.read(buf);
System.out.println("");
System.out.print("Server :: ");
System.out.print(getStringValue(buf,num));
System.out.println("");
}
}
//catch (IOException e) {
//System.out.println("Here in exception");
//}
catch(Exception e)
{
e.printStackTrace();
}
}
private String getStringValue(char[]buf, int num)
{
StringBuffer sb = new StringBuffer();
for(int i=0;i<num;i++)
sb.append(buf[i]);
return sb.toString();
}
}
Client main program - which launches clients
public class ClientThreadGenerator {
public static void main(String[] args) {
try{
String clientCount = args[0].trim();
System.out.println("client Count is -" + clientCount + "-");
int count = Integer.parseInt(clientCount);
for(int i=1;i<=count;i++){
Thread th = new Thread(new SampleBlockingClient(Integer.toString(i)));
th.start();
}
System.out.println("################################################");
System.out.println("Launched all " + count + "Threads ");
System.out.println("################################################");
}
catch (Throwable t)
{
System.out.println("Caught In Throwable");
t.printStackTrace() ;
}
}
}