I'm having messages redelivered unexpectedly. Why?
I am using iPlanet Message Queue for Java version 2.0 for a messaging
application. I have a publisher and a subscriber who subscribe to the same
TOPIC. I am using Publish-Subscribe messaging . The publisher publishes a
message to the topic and the subscriber picks it up. I have set the
message acknowledgement on both client and producer sessions as
AUTO_ACKNOWLEDGE, but even after getting the message on the client side,
if i re-run the client application, it gets the same message. In your
Administrator's Guide it is mentioned that if the session is set to
AUTO_ACKNOWLEDGE, then after getting the message, the client sends an
acknowledgment to the server and the server in turn deletes the
message. But when i run the client again, the message still
exists on the server and the client gets the message even though the
producer hasnt produced anything new. I would appreciate if you could
suggest how to overcome this problem.
There are two possibilities we can think of that would cause this to happen:
1. You used transacted session in your subscriber program:
TopicSession session = topicConnection.createTopicSession (true,
Session.AUTO_ACKNOWLEDGE);
If your subscriber has statement similar to the above one ('true' in the first
parameter), please change the first parameter from 'true' to 'false':
TopicSession session = topicConnection.createTopicSession (false,
Session.AUTO_ACKNOWLEDGE);
2. You exited the subscriber inside the message listener.If this is true,
please use a separate thread to close the connection and exit the program.
Here is a simple example that shows how to exit your program from the message
listener.
/*
* %W% %E%
*
* Copyright 2000 Sun Microsystems, Inc. All Rights Reserved
* SUN PROPRIETARY/CONFIDENTIAL
* Use is subject to license terms.
*
*/
package test.eng.jmsclient;
import javax.jms.*;
import com.sun.messaging.jmq.jmsclient.*;
import java.util.*;
public class QueueConsumer implements MessageListener, ExceptionListener {
QueueConnection conn = null;
QueueSession session = null;
Queue queue = null;
boolean closeConnection = false;
int receiveCount = 0;
long startTime = 0;
long endTime = 0;
public QueueConsumer() {
}
public void onException(JMSException e) {
System.out.println ("Exception listener is called ...");
notifyClose();
}
public void start () throws Exception {
com.sun.messaging.QueueConnectionFactory factory = new com.sun.messaging.QueueConnectionFactory();
//create connection
conn = factory.createQueueConnection();
conn.setExceptionListener(this);
//create session
session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue("testQueue");
QueueReceiver receiver = session.createReceiver( queue );
receiver.setMessageListener( this );
//start receiving messages
conn.start();
waitForClose();
}
public synchronized void waitForClose() throws JMSException {
while ( closeConnection == false ) {
try {
wait();
} catch (Exception e) {
;
}
}
conn.close();
}
public synchronized void notifyClose() {
closeConnection = true;
notifyAll();
}
public void onMessage ( Message message ) {
String prop = null;
receiveCount ++;
if ( receiveCount == 1 ) {
startTime = System.currentTimeMillis();
}
try {
prop = message.getStringProperty("STOP");
if ( prop != null ) {
endTime = System.currentTimeMillis();
long diff = endTime - startTime;
double msgsSec = (receiveCount * 1000.0) /diff ;
System.out.println ("msgs/sec: " + msgsSec);
receiveCount = 0;
notifyClose();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main (String args[]) {
QueueConsumer sample = new QueueConsumer();
try {
sample.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}