NIO multi second delay when making many connections
I wrote a test client that makes hundreds of non-blocking connections to a test server which simply waits until all connections are made and then returns a canned response on each connection and then closes the socket. When I run these tests with 100 connections where the client and server are communicating across the Internet I consistently see approximately a 3 second pause at about the 42 connection (varies) and around the 97th connection. The test client makes all 100 connections before writing a command to the server, and the server does not write out a response until all of the connections are accepted and commands received (synchronized manually by pressing the enter key). Connection requests are made 50 at a time (finishConnect must be performed on the first 50 before the second 50 is initiated. I can reproduce this with a variety of combinations of computers communicating across the Internet. Platform is Windows2000 Server, and XP professional (various combinations).
Below is the output of the test client showing connections as a function of time (with some abbreviation on non-interesting sections): You can see that at connection 46 there is approx a 2.8 second pause before anymore connections are completed. I suppose it may be reasonable for a single socket to experience a delay, but many "non-blocking" connections appear to block behind one or two slow connections which I would not expect - perhaps this is a bug?
I have included both the client and server code at the end of this post for anyone interested.
Client output:
Starting connections
started:50, :1, max:100,time=70
started:50, :2, max:100,time=70
started:50, :3, max:100,time=70
...
started:50, :45, max:100,time=210
started:50, :46, max:100,time=210
started:50, :46, max:100,time=1212
started:50, :46, max:100,time=2213
started:50, :47, max:100,time=3065
started:50, :48, max:100,time=3075
started:50, :49, max:100,time=3075
...
started:100, :90, max:100,time=3275
started:100, :91, max:100,time=3285
started:100, :91, max:100,time=4286
started:100, :91, max:100,time=5288
started:100, :92, max:100,time=6079
started:100, :93, max:100,time=6079
started:100, :94, max:100,time=6079
started:100, :95, max:100,time=6089
started:100, :96, max:100,time=6089
started:100, :97, max:100,time=6099
started:100, :98, max:100,time=6099
started:100, :99, max:100,time=6099
started:100, :99, max:100,time=7100
...
started:100, :99, max:100,time=21121
started:100, :99, max:100,time=22112
started:100, :99, max:100,time=23113
All connections done (24045)
Cumulative conns: 100
All writes done (40)
Server output:
Waiting for enter...
(Reports every 5 seconds:)
24 connections accepted
91 connections accepted
99 connections accepted
99 connections accepted
99 connections accepted
100 connections accepted
Client Code:
package loadtester;
import java.io.*;
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import loadtester.FastStringBuffer;
public class Main {
private CharsetEncoder encoder;
private CharsetDecoder decoder;
private final int ST_DOCONNS = 1;
private final int ST_IDLEWAIT = 2;
private final int ST_NOCONNS = 3;
private int state = ST_IDLEWAIT;
public static String svrAddr;
static private int maxInstances;
private Selector selector = null;
private static final char[] readDonePattern = "<html><body>Test Response</body></html>".toCharArray();
private int connsStarted=0;
private int connsDone=0;
private InetSocketAddress socketAddress;
private ByteBuffer getReqBuf=null;
private long connStartTime=0;
private long connWaitStartTime=0;
private long totalRcvd=0;
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
private ArrayList writeChans;
private int writeCnt=0;
private long writeStartTime=0;
private int connCnt=0;
private long tstsn=0;
private void doConns() {
if (state == ST_NOCONNS) {
return;
} else if (state == ST_IDLEWAIT) {
long curTime = System.currentTimeMillis();
if ((curTime - connWaitStartTime) < 2000) { //Wait 2 seconds before starting reconnects
return;
} else {
System.out.println("Starting connections");
tstsn++;
connStartTime = System.currentTimeMillis();
totalRcvd = 0;
state = ST_DOCONNS;
}
}
if (connsDone==maxInstances || connsStarted!=connsDone) {
System.out.println("started:"+connsStarted+", :"+connsDone+", max:"+maxInstances+",time="+(System.currentTimeMillis()-this.connStartTime));
return;
}
try {
int loopMax = ((maxInstances-connsDone)<50?(maxInstances-connsDone):50);
for (int i=0;i<loopMax;) {
connect();
i++;
/*if ((i % 10) == 0) {
Thread.sleep(20);
}*/
}
} catch (Exception e) {
System.out.println("doConns exc: "+e.getMessage());
}
}
private void init() {
try {
Charset charset = Charset.forName("ISO-8859-1");
encoder = charset.newEncoder();
decoder = charset.newDecoder();
selector = Selector.open();
connsStarted=0;
connsDone=0;
socketAddress = new InetSocketAddress(Main.svrAddr, 80);
writeChans = new ArrayList();
} catch (Exception e) {}
}
public void connect ( ) {
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false); //set non-blocking
channel.connect(socketAddress);
channel.register(selector, SelectionKey.OP_CONNECT);
connsStarted++;
} catch (UnknownHostException e) {
System.out.println("Connect: error - bad host");
} catch (IOException e) {
System.out.println("Connect: error: "+e.getMessage());
} catch (Exception e) {
System.out.println("Connect exception: "+e.getMessage());
}
}
private void doWrites() {
Iterator it = writeChans.iterator();
while(it.hasNext()) {
SocketChannel chan = (SocketChannel)it.next();
try {
chan.register(selector, SelectionKey.OP_WRITE);
} catch (Exception e) {}
}
writeChans.clear();
writeStartTime = System.currentTimeMillis();
writeCnt=0;
}
public void process( SelectionKey key ) throws IOException {
SocketChannel keyChannel = (SocketChannel)key.channel();
if ( keyChannel.isConnectionPending()) {
try {
keyChannel.register(selector, 0);
keyChannel.finishConnect();
writeChans.add(keyChannel);
connsDone++;
if (connsDone == maxInstances) {
connCnt += connsDone;
state = ST_NOCONNS;
System.out.println("All connections done ("+(System.currentTimeMillis()-this.connStartTime)+")");
doWrites();
System.out.println("Cumulative conns: "+connCnt);
}
} catch (Exception e) {
if (connsStarted>0)
connsStarted--;
key.cancel();
System.out.println("Conn reg exc: "+e.getMessage());
}
} else if (key.isWritable()) {
keyChannel.register(selector, SelectionKey.OP_READ, new FastStringBuffer(1000,500));
write(keyChannel);
writeCnt++;
if (writeCnt == maxInstances) {
System.out.println("All writes done ("+(System.currentTimeMillis()-writeStartTime)+")");
writeCnt=0;
}
} else if (key.isReadable()) {
read(key);
}
}
public void write(SocketChannel channel ) throws IOException {
try {
char[] sendChars = "GET /testget HTTP/1.0\r\nHost: NIOTestClient\r\n\r\n".toCharArray();
getReqBuf = encoder.encode(CharBuffer.wrap(sendChars,0,sendChars.length));
int len = channel.write(getReqBuf);
} catch (IOException e) {
System.out.println("write IOException: "+e.getMessage());
channel.close();
connsStarted--;
connsDone--;
throw new IOException(e.getMessage());
} catch (Exception e) {
System.out.println("write exception: "+e.getMessage());
channel.close();
connsStarted--;
connsDone--;
}
}
public void read( SelectionKey key ) throws IOException {
buffer.clear();
SocketChannel channel = (SocketChannel)key.channel();
boolean gotMsgEnd=false;
try {
if (channel.read(buffer) != -1) {
buffer.flip();
char[] result = decoder.decode(buffer).array();
buffer.clear();
FastStringBuffer requestString = (FastStringBuffer)key.attachment();
requestString.appendChars(result);
int len = requestString.length();
char[] r = requestString.buf;
if (requestString.contains( readDonePattern )) {
totalRcvd++;
System.out.println("totalRcvd:"+totalRcvd);
requestString.clear();
key.cancel();
channel.close();
if (totalRcvd == maxInstances) {
System.out.println("All responses received.");
if (connsDone != maxInstances || connsStarted!= maxInstances) {
System.out.println("started: "+connsStarted+", done: "+connsDone);
}
connsDone = 0;
connsStarted=0;
totalRcvd = 0;
state = ST_IDLEWAIT;
this.connWaitStartTime = System.currentTimeMillis();
}
}
}
} catch (IOException e) {
System.out.println("Read: error: "+e.getMessage());
if (channel != null) {
try {
channel.close();
connsDone--;
connsStarted--;
} catch (IOException ignored) {
}
}
throw new IOException(e.getMessage());
} catch (Exception e) {
channel.close();
connsDone--;
connsStarted--;
System.out.println("Exception e:"+e.getMessage());
}
}
public Main() {
init();
try {
int i=0;
Set readyKeys=null;
int numRdy;
long noneRdyCnt=0;
long startTime=0;
boolean done=false;
while (!done) {
doConns();
numRdy = selector.select(1000);
readyKeys = selector.selectedKeys();
numRdy = readyKeys.size();
if (numRdy>0) {
Iterator readyItor = readyKeys.iterator();
while (readyItor.hasNext()) {
SelectionKey key = (SelectionKey)readyItor.next();
readyItor.remove();
try {
process(key);
} catch (IOException ioe) {
System.out.println("Main IOException: "+ioe.getMessage());
key.cancel();
}
}
noneRdyCnt=0;
} else if (numRdy==0) {
noneRdyCnt++;
if (noneRdyCnt==20) {
startTime = System.currentTimeMillis();
} else if (noneRdyCnt==22) {
if ((System.currentTimeMillis()-startTime) < 1000) {
System.out.println("Select blocking failure");
Thread.sleep(500);
}
}
}
}
} catch (Exception e) {
System.out.println("Exception: "+e.getMessage());
e.printStackTrace();
}
System.out.println("Sessions: "+maxInstances);
}
public static void main(String[] args) {
//usage: loadtester.Main <svrAddr> <instances> <webcastkey>
try {
if (args.length<2) {
throw new Exception("params");
}
svrAddr = args[0];
maxInstances = Integer.parseInt(args[1]);
} catch (Exception e) {
System.out.println("usage: NIOServerLdTst.Main <svrAddr> <instances>");
return;
}
Main main1 = new Main();
}
}
package loadtester;
public final class FastStringBuffer {
public char[] buf=null;
private int len=0;
private int reallocSz=100;
private void chkFreeSpace( int addedLen ) {
if ((len+addedLen)> buf.length) {
char[] newBuf = new char[buf.length + addedLen+reallocSz];
System.arraycopy( buf, 0, newBuf, 0, len );
buf = newBuf;
}
}
public FastStringBuffer(int len, int allocSz) {
buf = new char[len];
reallocSz = allocSz;
}
public FastStringBuffer(String s) {
buf = new char[s.length()];
reallocSz = s.length()+100;
appendString(s);
}
public void clear() {
len=0;
}
public final void appendString( String s ) {
appendChars(s.toCharArray());
}
public final void appendChars( char[] charBuf ) {
chkFreeSpace( charBuf.length );
System.arraycopy(charBuf,0,buf,len,charBuf.length);
len += charBuf.length;
}
public final void appendChar( char c ) {
chkFreeSpace( 1 );
buf[len++] = c;
}
public final String toString() {
return new String(buf,0,len);
}
public final int length() {
return len;
}
public boolean contains( char[] pattern ) {
int plen=pattern.length;
for (int s=0;s<=len-plen;s++) {
boolean match=true;
for (int i=0;i<plen;i++) {
if (buf[s+i]!=pattern) {
match=false;
break;
}
}
if (match) return true;
}
return false;
}
}
Server Code:
package http;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.util.*;
import java.text.*;
public class Server {
private AcceptThread acceptor;
private ReadWriteThread responder;
private ConsoleThread consoleThread;
public static int port=80;
public void start() {
try {
Selector acceptSelector = Selector.open();
Selector readSelector = Selector.open();
LinkedList connections = new LinkedList();
acceptor = new AcceptThread(acceptSelector, connections, port, readSelector);
responder = new ReadWriteThread(readSelector, connections);
consoleThread = new ConsoleThread();
} catch (Exception e) {}
responder.start();
acceptor.start();
consoleThread.start();
}
public void shutdown() {
responder.shutdown();
acceptor.shutdown();
}
public static void main( String[] args ) {
try {
Server.port = Integer.parseInt(args[1]);
} catch (Exception e) {}
Server server = new Server();
server.start();
}
}
package http;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.util.*;
class AcceptThread extends Thread {
private ServerSocketChannel ssc;
private Selector connectSelector;
private Selector readSelector;
private LinkedList acceptedConnections;
private LinkedList tmpConns;
private boolean done=false;
public static int tstConnCnt=0;
public static int acceptCnt=0;
private long startTS=0;
public AcceptThread(Selector connectSelector, LinkedList list, int port, Selector readSelector) throws Exception {
super("Acceptor");
this.tmpConns = new LinkedList();
this.connectSelector = connectSelector;
this.readSelector = readSelector;
this.acceptedConnections = list;
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(port);
boolean reuseAddr = ssc.socket().getReuseAddress();
ssc.socket().bind(address, 225);
ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT);
}
public void run() {
long startTime=0;
int noneRdyCnt=0;
while(!done) {
try {
int numRdy = connectSelector.select();
if (numRdy==0) {
noneRdyCnt++;
if (noneRdyCnt==20) {
startTime = System.currentTimeMillis();
} else if (noneRdyCnt==22) {
if ((System.currentTimeMillis()-startTime) >< 1000) {
System.out.println("Accept select blocking failure");
Thread.sleep(500);
noneRdyCnt=0;
}
}
} else {
noneRdyCnt=0;
}
acceptPendingConnections();
} catch(InterruptedException ex) {
System.out.println("AcceptThread main loop interrupted");
} catch(Exception ex) {
ex.printStackTrace();
}
}
}
protected void acceptPendingConnections() throws Exception {
Set readyKeys = connectSelector.selectedKeys();
if (tstConnCnt==0) {
startTS=System.currentTimeMillis();
}
tstConnCnt += readyKeys.size();
for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
SelectionKey key = (SelectionKey)i.next();
i.remove();
ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel();
SocketChannel incomingChannel = readyChannel.accept();
tmpConns.add(incomingChannel);
acceptCnt++;
}
synchronized (acceptedConnections) {
acceptedConnections.addAll(tmpConns);
}
//readSelector.wakeup(); //added for Wakeup model when non-blocking bug fixed
if (tmpConns.size()>0) {
tmpConns.clear();
}
}
public void shutdown() {
done=true;
interrupt();
}
}
package http;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.util.*;
class ReadWriteThread extends Thread{
private static final int READ_BUFFER_SIZE = 4096;
private boolean done=false;
private static final int MAXCHANS = 1000;
private SocketChannel[] chans = new SocketChannel[MAXCHANS];
private int numChans = 0;
public Selector readWriteSelector;
private LinkedList acceptedConnections;
private ByteBuffer readBuffer;
private ByteBuffer respBuf = null;
private Charset ascii;
private CharsetDecoder asciiDecoder;
private CharsetEncoder asciiEncoder;
public static volatile boolean doBroadcast=false;
private long connStartTime=0;
private long rcvStartTime=0;
private long rcvEndTime=0;
private int readCnt=0;
private int writeCnt=0;
private int connCnt=0;
private Set readyKeys = null;
private static final String respString = "<html><body>Test Response</body></html>";
private void makeRespBufs() {
StringBuffer sBuf = new StringBuffer();
sBuf.append("HTTP/1.1 200 OK\r\nServer: Auctio WebCaster\r\nCache-control: no-cache\r\nCache-control: no-store\r\nPragma: no-cache\r\nExpires: 0\r\nContent-Type: text/html\r\n");
sBuf.append("Content-Length: ");
sBuf.append(respString.length());
sBuf.append("\r\n\r\n");
sBuf.append(respString);
char[] ca = sBuf.toString().toCharArray();
try {
respBuf = asciiEncoder.encode(CharBuffer.wrap( ca ,0, ca.length));
} catch (Exception e) {}
}
public ReadWriteThread(Selector readSelector, LinkedList acceptedConnections) throws Exception {
super("Reader-Writer");
this.readWriteSelector = readSelector;
this.acceptedConnections = acceptedConnections;
this.readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
ascii = Charset.forName("US-ASCII");
asciiDecoder = ascii.newDecoder();
asciiEncoder = ascii.newEncoder();
makeRespBufs();
}
public void run() {
long noneRdyCnt=0;
long startTime=0;
long loopCnt=0;
int keysReady=0;
int acceptedConns=0;
long lastTime=0;
while(!done) {
try {
readWriteSelector.select(100); //Use this instead of blocking call with wakeups due to bug 4850373
readyKeys = readWriteSelector.selectedKeys();
keysReady = readyKeys.size();
acceptedConns = acceptedConnections.size();
//The following if block is a work around for bug 4850373 (stops blocking) should it occur
if (keysReady==0 && acceptedConns==0) {
noneRdyCnt++;
if (noneRdyCnt==20) {
startTime = System.currentTimeMillis();
} else if (noneRdyCnt==24) {
if ((System.currentTimeMillis()-startTime) < 375) {
System.out.println("Read select blocking failure");
Thread.sleep(200);
noneRdyCnt=0;
}
}
} else {
noneRdyCnt=0;
}
connCnt += acceptedConns;
long delta = System.currentTimeMillis()-lastTime;
if (delta > 5000 && connCnt>0) {
lastTime = System.currentTimeMillis();
System.out.println(connCnt+" connections accepted");
}
if (acceptedConns>0) {
registerNewChannels();
}
if (doBroadcast) {
System.out.println("Receive time: "+(this.rcvEndTime-this.rcvStartTime));
rcvStartTime=0;
doBroadcast();
doBroadcast=false;
}
if(keysReady > 0) {
acceptPendingRequests();
}
} catch(InterruptedException ex) {
System.out.println("ReadWriteThread main loop interrupted");
} catch(Exception ex) {
System.out.println("ReadWriteThread Exception: "+ex.getMessage());
ex.printStackTrace();
}
}
}
protected void registerNewChannels() throws Exception {
SocketChannel channel;
SocketChannel[] newChans=null;
synchronized (acceptedConnections) {
newChans = new SocketChannel[acceptedConnections.size()];
acceptedConnections.toArray(newChans);
acceptedConnections.clear();
}
for (int i=0;i<newChans.length;i++) {
channel = newChans;
channel.configureBlocking(false);
//channel.socket().setTcpNoDelay(true);
channel.register(readWriteSelector, SelectionKey.OP_READ, new StringBuffer());
}
}
protected void acceptPendingRequests() throws Exception {
for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
SelectionKey key = (SelectionKey)i.next();
i.remove();
if (key.isReadable()) {
readRequest(key);
} else if (key.isWritable()) {
writeCnt++;
//System.out.println("writing "+writeCnt);
if (writeCnt==readCnt) {
writeCnt=0;
readCnt=0;
}
this.sendResponse(key);
} else {
boolean isValid = key.isValid();
boolean isWritable = key.isWritable();
String x = "ASD";
}
}
}
private static final boolean isGet(char[] buf) {
return(buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ');
}
public static String getServletName( char[] buf ) {
StringBuffer sn = new StringBuffer();
for (int i=(isGet(buf)?4:5);i<buf.length && buf!='?' && buf!=' ';i++) {
sn.append(buf);
}
return sn.toString();
}
private void readRequest(SelectionKey key) throws Exception {
readCnt++;
if (rcvStartTime==0) rcvStartTime = System.currentTimeMillis();
SocketChannel incomingChannel = (SocketChannel)key.channel();
char[] requestChars=null;
int bytesRead=0;
try {
readBuffer.clear();
bytesRead = incomingChannel.read(readBuffer);
if (bytesRead == -1) {
key.cancel();
incomingChannel.close();
return;
}
readBuffer.flip();
requestChars = asciiDecoder.decode(readBuffer).array();
StringBuffer requestString = (StringBuffer)key.attachment();
requestString.append(requestChars);
int len = requestString.length();
if (len>2048) {
System.out.print("readRequest length exceeded 2048 - socket closed");
key.cancel();
incomingChannel.close();
return;
}
requestChars = requestString.toString().toCharArray();
//Normally Content-Length would also be used to determine when the receive is complete, but the following works for this test case
if(((len>=2) && (requestChars[len-2]=='\n' && requestChars[len-1]=='\n')) || ((len>=4) && (requestChars[len-4]=='\r' && requestChars[len-3]=='\n' && requestChars[len-2]=='\r' && requestChars[len-1]=='\n'))) {
handleCompletedRequest(requestChars, key, incomingChannel);
key.cancel();
}
} catch (Exception re) {
String msg = re.getMessage();
if (msg.indexOf("forcibly closed")<0) {
re.printStackTrace();
String errMsg = re.getMessage();
System.out.println("readRequest "+re.getClass().getName()+" "+(errMsg!=null?errMsg:""));
if (requestChars!=null) {
System.out.println("r: ");
}
System.out.println("\nbytesRead: "+bytesRead);
}
key.cancel();
incomingChannel.close();
}
}
private void waitForBroadcast( SocketChannel channel ) {
rcvEndTime = System.currentTimeMillis();
if (numChans==0) {
connStartTime = System.currentTimeMillis();
}
if (numChans >= MAXCHANS) return;
chans[numChans++] = channel;
}
private void sendResponse( SelectionKey key ) {
try {
SocketChannel sChan = (SocketChannel)key.channel();
boolean isConnected = sChan.isConnected();
boolean isBlocking = sChan.isBlocking();
if (!key.isWritable()) System.out.println("Not writeable in sendResponse");
if (isConnected && !isBlocking) {
respBuf.rewind();
sChan.write(respBuf);
key.cancel();
sChan.close();
int remaining = respBuf.remaining();
if (remaining!=0) {
System.out.println("bytes remaining in sendResponse");
}
} else if (!isConnected) {
System.out.println("sendReponse lost connection");
} else if (isBlocking) {
System.out.println("sendReponse blocking");
}
} catch (Exception e) {
System.out.println("sendResponse exception");
String errMsg = e.getMessage();
if (errMsg!=null && errMsg.indexOf("non-blocking socket operation could not be completed")>=0) {
System.out.println("sendResponse: non-blocking socket operation could not be completed");
}
if (errMsg == null) {
System.out.println("sendResponse exception");
} else if (errMsg.indexOf("forcibly closed")<0) {
System.out.println("sendResponse: exception=" + e.getMessage());
}
}
}
private void doBroadcast( ) {
System.out.println("Connections accepted: "+AcceptThread.acceptCnt);
AcceptThread.acceptCnt = 0;
System.out.println("Connections read: "+readCnt);
System.out.println("Doing broadcast of "+numChans+" channels");
long broadcastStartTime = System.currentTimeMillis();
try {
for (int i=0;i<numChans;) {
chans.register(readWriteSelector,SelectionKey.OP_WRITE);
i++;
if ((i%50) == 0) Thread.sleep(20);
}
numChans = 0;
long endTS = System.currentTimeMillis();
long delta = endTS - broadcastStartTime;
System.out.println("Broadcast time: "+delta);
} catch (Exception e) {
System.out.println("doBroadcast Exception: "+e.getMessage());
}
connCnt=0;
}
protected void handleCompletedRequest(char[] request, SelectionKey key, SocketChannel channel) throws RequestException, IOException {
String servletName = getServletName( request );
if (servletName==null || !servletName.equals("/testget")) {
this.sendError(channel);
key.cancel();
channel.close();
return;
}
try {
waitForBroadcast( channel );
} catch (Exception e) {
e.printStackTrace();
System.out.println("handleCompletedRequest Exception: "+e.getMessage());
}
}
protected void sendError(SocketChannel channel) {
try {
ByteBuffer buffer = null;
CharBuffer chars = CharBuffer.allocate(64);
chars.put("HTTP/1.0 404 OK\n");
chars.put("Connection: close\n");
chars.put("Server: Not Specified\n");
chars.put("\n");
chars.flip();
buffer = ascii.newEncoder().encode(chars);
channel.write(buffer);
} catch (Exception e) {}
}
public void shutdown() {
done=true;
}
}
package http;
import java.io.*;
//Used to initiate responses (users presses enter on the console)
public class ConsoleThread extends Thread {
private boolean done=false;
public ConsoleThread() {
}
public void run() {
while(!done) {
String str = null;
DataInputStream dis = new DataInputStream(System.in);
System.out.println("Waiting for enter...");
try{str = dis.readLine();}
catch(IOException ioe){}
System.out.println("Starting Broadcast\n");
ReadWriteThread.doBroadcast=true;
}
}
}
>

