Read Object Throw java.io.EOFException
I write a Client/Server program, I use QuickServer for the server.
The client is write by my self. And in the mostly time, it run well.but after a while. it Throw a exception:
2006-07-10 17:49:01,000 [ReceiverDaemon] ERROR - Socket I/O error:
java.io.EOFException
at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2502)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1267)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:339)
at com.photon.netview.net.NVLink.readObject(NVLink.java:59)
at com.photon.netview.net.Connection.readNextPacketInternal(Connection.java:336)
at com.photon.netview.net.Connection.run(Connection.java:90)
at java.lang.Thread.run(Thread.java:595)
2006-07-10 17:49:01,000 [ReceiverDaemon] ERROR - Socket I/O error:
java.io.EOFException
at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2502)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1267)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:339)
at com.photon.netview.net.NVLink.readObject(NVLink.java:59)
at com.photon.netview.net.Connection.readNextPacketInternal(Connection.java:336)
at com.photon.netview.net.Connection.run(Connection.java:90)
at java.lang.Thread.run(Thread.java:595)
2006-07-10 17:49:01,000 [ReceiverDaemon] ERROR - Socket I/O error:
java.io.EOFException
at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2502)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1267)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:339)
at com.photon.netview.net.NVLink.readObject(NVLink.java:59)
at com.photon.netview.net.Connection.readNextPacketInternal(Connection.java:336)
at com.photon.netview.net.Connection.run(Connection.java:90)
at java.lang.Thread.run(Thread.java:595)
My code as follow:
package com.photon.netview.net;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
import com.photon.netview.client.utils.StrPicker;
import com.photon.netview.client.utils.SystemConfig;
import com.photon.netview.dbclient.exbeans.ExUserInfo;
import com.photon.netview.protocol.NVProtocol;
import com.photon.netview.protocol.ProtocolChecker;
import com.photon.netview.protocol.bean.CommandCode;
import com.photon.netview.protocol.bean.ErrorCode;
import com.photon.netview.protocol.bean.LoginBean;
import com.photon.netview.protocol.client.LoginReq;
import com.photon.netview.protocol.client.ShankhandReq;
import com.photon.netview.protocol.message.nvexception.NVTimeOutException;
import com.photon.netview.protocol.message.nvexception.ProtocolException;
import com.photon.netview.protocol.server.ShankhandAck;
publicclass Connectionimplements java.lang.Runnable{
/**
* Logger for this class
*/
privatestaticfinal Logger logger = Logger.getLogger(Connection.class);
publicstaticfinalint UNLOGIN = 0;
publicstaticfinalint LOGIN = 1;
publicstaticfinalint LOGOUT = 2;
privatestatic Connection connection;
/** Packet listener thread for Asyncronous comms. */
private Thread rcvThread =null;
private NVLink link =null;
private Thread shkThread =null;
ShankhandHandler shkHandler =null;
privateint shkInterval = 30000;
privateint state = UNLOGIN;
privateint errCount;
privatestaticint MAX_IOERROR = 4;
/** response message queue */
private List<NVProtocol> ackQueue;
/** Object to use to lock reading and writing ack queue */
privatefinal Object ackWait =new Object();
/** Object to use to lock reading and writing command queue */
// private final Object cmdWait = new Object();
private List<INotifyListener> ntfListeners;
privatevolatilelong lastMsgTime;
privateint timeoutDelay = 1000;
privateint timeout = 1000;
private SequenceNumberScheme seqNumScheme =new DefaultSequenceScheme();
private Connection(String host,int port)throws UnknownHostException{
super();
this.link =new TCPLink(host, port);
initSyncComms();
}
publicvoid run(){
try{
while (state != LOGOUT){
NVProtocol pro =null;
try{
pro = readNextPacketInternal();
errCount = 0;
}catch (ProtocolException e){// unregisted protocol
logger.error(e.getMessage());
errCount++;
if (errCount <= MAX_IOERROR)
continue;
else{
link.close();
break;
}
}catch (IOException e){
if (!(einstanceof SocketTimeoutException))
logger.error(StrPicker
.getString("error.net.socketerror"), e);
continue;
}
if ((pro.getCommand() == CommandCode.CM_LOGIN_ACK)
|| (state != UNLOGIN))
insert2Queue(pro);
if (null == shkThread){
shkHandler =new ShankhandHandler();
shkHandler.setInterval(shkInterval);
shkThread =new Thread(shkHandler,"Shankhand Thread");
shkThread.start();
}
}
}catch (Exception x){
logger.error(StrPicker.getString("error.net.fatalrecverr"), x);
setState(UNLOGIN);
}finally{
rcvThread =null;
shkHandler.stopThread();
}
}
/**
* insert the received protocol packet to queue
*
* @param pro
*/
privatevoid insert2Queue(NVProtocol pro){
if (logger.isDebugEnabled()){
logger.debug("insert queue.");
logger.debug(pro.toDebugString());
}
if (ProtocolChecker.isAck(pro)){
synchronized (ackWait){
ackQueue.add(0, pro);
ackWait.notify();
if (logger.isDebugEnabled()){
logger.debug("ack notified;"+pro.getSerialNO());
}
}
}else{
// synchronized (cmdWait) {
// cmdQueue.add(0, pro);
// cmdWait.notify();
Iterator<INotifyListener> itr = ntfListeners.iterator();
while (itr.hasNext()){
itr.next().handleNotify(this, pro);
}
// }
}
}
protected NVProtocol waitForResponse(NVProtocol req,long timeout)
throws NVTimeOutException{
long currTime =new Date().getTime();
long tm = timeout;
synchronized (ackWait){
while (true){
Iterator<NVProtocol> it = ackQueue.listIterator();
while (it.hasNext()){// find ack message
NVProtocol rsp = it.next();
if (ProtocolChecker.isAck(req, rsp)){
ackQueue.remove(rsp);
return rsp;
}
}
try{
if (logger.isDebugEnabled()){
logger.debug("begin wait protocol ack:");
logger.debug(req.toDebugString());
}
ackWait.wait(2000);
}catch (InterruptedException e){
}
long diff = (new Date().getTime()) - currTime;
tm = tm - diff;
if (tm < timeoutDelay)// prevent wait time is too short
tm = timeoutDelay;
if (diff >= (timeout + timeoutDelay)){
thrownew NVTimeOutException(StrPicker
.getString("error.protocol.timeout"));
}
}
}
}
privatevoid initSyncComms(){
this.ackQueue =new LinkedList<NVProtocol>();
// this.cmdQueue = new LinkedList<NVProtocol>();
this.ntfListeners =new LinkedList<INotifyListener>();
}
/**
* Create the receiver thread if asynchronous communications is on, does
* nothing otherwise.
*/
privatevoid createRecvThread(){
rcvThread =new Thread(this,"ReceiverDaemon");
rcvThread.setDaemon(true);
}
/**
* Method to open the link to the nvserver. This method will connect the
* underlying nvserver link object if necessary and reset the sequence
* numbering scheme to the beginning.
*
* @throws IOException
*
* @throws java.io.IOException
* if an i/o error occurs while opening the connection.
*/
protectedvoid openLink()throws IOException{
if (!this.link.isConnected()){
try{
this.link.open();
this.link.setTimeout(timeout);
}catch (IOException e){
logger.error(StrPicker.getString("error.net.linkfaile"), e);
throw (e);
}
if (this.seqNumScheme !=null)
this.seqNumScheme.reset();
}
}
public NVProtocol sendRequest(NVProtocol req)
throws java.net.SocketTimeoutException, java.io.IOException{
if (null == rcvThread)
createRecvThread();
openLink();
if (!rcvThread.isAlive())
rcvThread.start();
req.setSerialNO(seqNumScheme.nextNumber());
try{
link.writeObject(req);
if (logger.isDebugEnabled()){
logger.debug(req.toDebugString());
}
}catch (IOException e){
logger.error(StrPicker.getString("error.net.socketerror"));
returnnull;
}
NVProtocol ret =null;
try{
if (logger.isDebugEnabled()){
logger.debug("wait for response");
}
ret = waitForResponse(req, link.getTimeout());
lastMsgTime =new Date().getTime();
}catch (NVTimeOutException e){
logger.error(e.getMessage());
}catch (UnsupportedOperationException e){
logger.error(StrPicker.getString("error.jdk.verlower"), e);
}
return ret;
}
public ExUserInfo login(String usr,char[] pwd)throws SocketTimeoutException, IOException{
LoginReq loginReq =new LoginReq();
LoginBean bean =new LoginBean();
bean.setUsername(usr);
bean.setPassword(pwd);
loginReq.setDataBean(bean);
NVProtocol resp = sendRequest(loginReq);
if (resp.getErrorCode() == ErrorCode.CM_SUCCESS){
ExUserInfo ret = (ExUserInfo) resp.getDataBean();
state =null != ret ? LOGIN : UNLOGIN;
if (logger.isDebugEnabled())
logger.debug(usr+" login success.");
return ret;
}
returnnull;
}
publicboolean shankHand(){
ShankhandReq req =new ShankhandReq();
try{
ShankhandAck ack = (ShankhandAck) sendRequest(req);
return ack.getCommand() == ErrorCode.CM_SUCCESS;
}catch (Exception e){
returnfalse;
}
}
publicvoid sendResponse(NVProtocol resp)throws java.io.IOException{
try{
if (logger.isDebugEnabled())
logger.debug("Send Response:\n"+resp.toDebugString());
link.writeObject(resp);
if (logger.isDebugEnabled())
logger.debug("Send Response:"+resp.getCommand()+" success");
}catch (java.net.SocketTimeoutException e){
}
}
/**
*
* @return
* @throws java.io.IOException
* @throws ProtocolException
*/
private NVProtocol readNextPacketInternal()throws java.io.IOException,
ProtocolException{// This function throw out EOFExceptionNVProtocol ret = null;
try{
Object obj;
obj = link.readObject();
if (ProtocolChecker.isValid(obj)){
ret = (NVProtocol) obj;
//if (logger.isDebugEnabled())
//logger.debug("Receive a objec:\n"+ret.toDebugString());
}
else
thrownew ProtocolException(StrPicker
.getString("error.protocol.unknown"));
}catch (ClassNotFoundException e){
thrownew ProtocolException(StrPicker
.getString("error.protocol.unknown"));
}
return ret;
}
/**
* @return Returns the state.
*/
publicsynchronizedint getState(){
return state;
}
/**
* @param state
*The state to set.
*/
publicsynchronizedvoid setState(int state){
this.state = state;
}
publicstatic Connection create(SystemConfig cfg)throws Exception{
if (null == connection
|| !cfg.getServerip().equals(
((TCPLink) connection.link).getHost())
|| cfg.getPort() != ((TCPLink) connection.link).getPort()){
connection =new Connection(cfg.getServerip(), cfg.getPort());
connection.timeout = cfg.getTimeout();
connection.shkInterval = cfg.getShankhdInterval();
cfg =null;
}
return connection;
}
publicstatic Connection instance(){
return connection;
}
publicvoid addNotifyListener(INotifyListener listener){
this.ntfListeners.add(listener);
}
publicvoid removeNotifyListener(INotifyListener listener){
this.ntfListeners.remove(listener);
}
/**
* @return Returns the lastMsgTime.
*/
publiclong getLastMsgTime(){
return lastMsgTime;
}
publicboolean isLink(){
returnnull != link ? link.isConnected() :false;
}
}
Please help me,
If posible. please tell me the possible reason which cause this exception.
Thanks for your help.

