consuming a jms queue from a multithread application

Hi

Thank you for reading my post.

I have an application which shuld consume message of a queue.

my application is multithread and what i want to know is:

- How i can ask queue to remove a message after my thread finished its work with the message, by this way i can ensure that if my thread face an exception after it read the message my message will not lose.

- As it is multithread application, How i can ensure that a message is not delivered to two or more thread? is there any way to consume the messages in a synchronized way?

Will synchronized access restrict the performance?

Thanks

[640 byte] By [Legolas.wa] at [2007-11-27 3:30:14]
# 1

I have a multi thread environment doing just what you acked about.

First I use client acknowledgement by setting:

session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Then when your thread is done with the message you can remove it with:

//***********************************************************

public void removeMessage(Message message) {

try {

// Acknowledge the message to remove it.

((com.sun.messaging.jms.Message)message).acknowledgeThisMessage();

} catch(JMSException didNotHappen) {

logger.error("Failed to acknowledge current message. "

+ "The failed message is: " + didNotHappen.getMessage());

}

}

For multi-threading, I have a thread pool with worker classes. If one is availible then I pass the message to it and launch it with run() or start().

If your using JDBC you have to have a pool of connections for them too, or else create a new one within each thread.

Works pretty good.

cferris84a at 2007-7-12 8:33:16 > top of Java-index,Enterprise & Remote Computing,Enterprise Technologies...
# 2

Hi

Thank you very much for your reply.

Can you please give me more details about your worker threads and thread pool?

Is your thread pool in a j2se application or it is in a j2ee application?

I should consume messages from a j2se application.

One more request, can you please share some of your codes with me ?

my mail is legolas.w@ gmail.com

thanks.

Legolas.wa at 2007-7-12 8:33:16 > top of Java-index,Enterprise & Remote Computing,Enterprise Technologies...
# 3

This is running in a J2DK environment. For my thread pool, I just copied one from a Sun tutorial on multi-threading then modified it. I creaed a seperate package with three classes. Here is my thread pooling package. I basically use a Vector to hold threads when they are not working. And you can set a limit as to the max number of active threads.

package threadpooling;

/**

*

* @author cferris

*

* This class is simple to wrap the runnable

* thread for the worker thread around other

* needed parameters like salon id and source.

*

*/

public class TaskWrapper {

// Define the needed instance fields.

public Runnable taskAtHand = null;

public String taskType = "";

//***********************************************************

// Overloaded constructor.

//***********************************************************

public TaskWrapper(Runnable task) {

// Just set the runnable

taskAtHand = task;

taskType = "WorkerThread";

} // of overloaded contructor

//***********************************************************

// Overloaded constructor.

//***********************************************************

public TaskWrapper(Runnable task, String taskType) {

// Just set the instance fields

taskAtHand = task;

this.taskType = taskType;

} // of overloaded contructor

}

package threadpooling;

/**

*

* @author cferris

*

* This class is for tracking the number of

* active threads so we don't go over the set

* limit. This class basically synchronizes

* the methods that update the active thread

* counter.

*

*/

import java.util.*;

public class ThreadPool {

// Define the instance fields.

//First define instance fields for the

//management of the thread pools - pm.

protected Vector poolOfThreads = new Vector();

private static LinkedList taskQueue = new LinkedList();

private static int poolSize = 0;

private static boolean pmAvailable = true;

private static boolean daemon = false;

publicstatic boolean shutDown = false;

//Instance field for the management of

//the threads - tm.

private static int activeThreads = 0;

private static int maxReached = 0;

private static int numTimesReached = 0;

private static boolean tmAvailable = true;

// Define constants

private static final int MAX_LIMIT_DEFAULT = 32;

//***********************************************************

// The constructor.

//***********************************************************

public ThreadPool() {

this(MAX_LIMIT_DEFAULT, false);

} // of constructor

//***********************************************************

// Overload the constructor with max number of active jobs.

//***********************************************************

public ThreadPool(int poolSize, boolean daemon) {

this.poolSize = poolSize;

this.daemon = daemon;

poolOfThreads = new Vector();

taskQueue = new LinkedList();

salonTracker = new Vector();

// Load up and initialize the threads.

for (int i=0; i < poolSize; i++) {

Thread thread = new WorkerThread(this, i);

thread.setDaemon(daemon);

thread.start();

poolOfThreads.add(thread);

}

} // of overloaded constructor

//***********************************************************

// Overload this method to allow just the runnable task.

//***********************************************************

public void assignWork(Runnable work) {

// Create the wrapper

TaskWrapper wrapper = new TaskWrapper(work, "A task");

assignWork(wrapper);

} // of overloaded assignWork

//***********************************************************

// Push a unit of work (work thread) on the the worker thread.

//***********************************************************

public synchronized void assignWork(TaskWrapper work) {

// First increament the active thread count.

increment();

// Wait until we can access the linked list

try {

if (!pmAvailable)

wait();

} catch(InterruptedException e) { }

pmAvailable = false;

notifyAll();

taskQueue.addLast(work);

pmAvailable = true;

notifyAll();

} // of assignWork

//***********************************************************

// Get the next unit of work (work thread).

//***********************************************************

public synchronized TaskWrapper getNextWorkAssignment() {

// Wait until we can access the linked list

try {

if (!pmAvailable)

wait();

} catch(InterruptedException e) { }

pmAvailable = false;

notifyAll();

TaskWrapper nextTask;

if (!taskQueue.isEmpty())

nextTask = (TaskWrapper)taskQueue.removeFirst();

else

nextTask = null;

pmAvailable = true;

notifyAll();

return nextTask;

} // of getNextWorkAssignment

//***********************************************************

// Return the number of tasks waiting on the work queue.

//***********************************************************

public synchronized int workWaiting() {

// Wait until we can access the linked list

try {

if (!pmAvailable)

wait();

} catch(InterruptedException e) { }

pmAvailable = false;

notifyAll();

int qSize = taskQueue.size();

pmAvailable = true;

notifyAll();

return qSize;

} // of workWaiting

//***********************************************************

// Wait until all the tasks have been completed.

//***********************************************************

public void waitForCompletion() {

int prev = 0;

// Continue checking untill all the threads have

// completed and been removed.

while(!poolOfThreads.isEmpty()) {

// Lets record the difference

if (poolOfThreads.size() != prev) {

System.out.println("waiting for " + poolOfThreads.size() + " to complete");

prev = poolOfThreads.size();

}

// Loop through the thread pool and check each thread.

for (int t = 0; t < poolOfThreads.size(); t++)

if (!((WorkerThread)poolOfThreads.get(t)).working)

// This thead is done so remove it.

poolOfThreads.remove(t);

} // of while

} // waitForCompletion

//***********************************************************

// Set the max number of allow threads.

//***********************************************************

public synchronized void setPoolSize(int newLimit) {

// just set the limit.

// Check if available

while (!tmAvailable) {

try {

wait();

}

catch(InterruptedException ie) {

System.exit(5);

}

}

// Ok, lock the object while we update it.

tmAvailable = false;

notifyAll();

poolSize = newLimit;

// Reset the max counters/trackers too

maxReached = 0;

numTimesReached = 0;

// Create additional threads in the pool if needed

if (poolSize > poolOfThreads.size())

// Load up and initialize the threads.

for (int i = poolOfThreads.size(); i < poolSize; i++) {

Thread thread = new WorkerThread(this, i);

thread.start();

poolOfThreads.add(thread);

}

// Done, now release the object.

tmAvailable = true;

notifyAll();

} // of setPoolSize.

//***********************************************************

// The method to increment thread counter.

//***********************************************************

private synchronized void increment() {

// Check if available

while (!tmAvailable) {

try {

wait();

}

catch(InterruptedException ie) {

// logger.fatal("Synchronized error in ThreadPool ");

// logger.fatal("Error message is: " + ie.getMessage());

System.exit(5);

}

}

// Ok, lock the object while we update it.

tmAvailable = false;

notifyAll();

activeThreads++;

// Check if this is the highest number of threads

// that has been active, if not then set so.

if (activeThreads > maxReached)

maxReached = activeThreads;

if (activeThreads == poolSize)

numTimesReached++;

// Done, now release the object.

tmAvailable = true;

notifyAll();

} // of increment

//***********************************************************

// The method to increment thread counter.

//***********************************************************

public synchronized void decrement() {

// Check if available

while (!tmAvailable) {

try {

wait();

}

catch(InterruptedException ie) {

// logger.fatal("Synchronized error in ThreadPool ");

// logger.fatal("Error message is: " + ie.getMessage());

System.exit(5);

}

}

// Ok, lock the object while we update it.

tmAvailable = false;

notifyAll();

activeThreads--;

// Done, now release the object.

tmAvailable = true;

notifyAll();

} // of decrement

//***********************************************************

// A method to determine if the maximum number of active

// threads has been reached.

//***********************************************************

public synchronized boolean limitNotReached() {

boolean limit = false;

// Check if available

while (!tmAvailable) {

try {

wait();

}

catch(InterruptedException ie) {

// logger.fatal("Synchronized error in ThreadPool.");

// logger.fatal("Error message is: " + ie.getMessage());

System.exit(5);

}

}

// Ok, lock the object while we update it.

tmAvailable = false;

notifyAll();

if (activeThreads < poolSize)

limit = true;

// Done, now release the object.

tmAvailable = true;

notifyAll();

return limit;

} // of limitNotReached

//***********************************************************

// A method to return the number of active threads.

//***********************************************************

public synchronized int currentNum() {

// Check if available

while (!tmAvailable) {

try {

wait();

}

catch(InterruptedException ie) {

// logger.fatal("Synchronized error in ThreadPool");

// logger.fatal("Error message is: " + ie.getMessage());

System.exit(5);

}

}

// Ok, lock the object while we update it.

tmAvailable = false;

notifyAll();

int numberOf = activeThreads;

// Done, now release the object.

tmAvailable = true;

notifyAll();

return numberOf;

} // of currentNum

package threadpooling;

/**

*

* @author cferris

*

*This is the work thread that receives the

*taskes or unit of work that make up the

*thread pool.

*

*/

public class WorkerThread extends Thread {

// Define need instance fields.

public boolean working = true;

public int id = 0;

private int pauseFactor = 1;

// The thread pool that this object belongs to.

public ThreadPool owner;

//*********************************************************

// Define the constructor for the seperate thread.

//*********************************************************

public WorkerThread(ThreadPool pooler, int id) {

owner = pooler;

this.id = id;

} // of contructor

//*********************************************************

// Wait for work or for system shutdown.

//*********************************************************

public void run() {

// Define the work object.

TaskWrapper unitOfWork = null;

// Wait for work or until system is shutting down.

do {

unitOfWork = owner.getNextWorkAssignment();

if (unitOfWork != null) {

// First set name and then add salon to watch list.

String taskName = unitOfWork.taskType + "-" + id;

setName(taskName);

// System.out.println("Starting thread - " + getName());

unitOfWork.taskAtHand.run();

// Done, now adjust tracker and reset the pause factor.

owner.decrement();

pauseFactor = 1;

}

else {

// No work so pause a bit

try {

sleep(150 * pauseFactor);

} catch (InterruptedException e) { }

// If consecutively pausing because of no work then

// increase the pause time up to a point.

if (pauseFactor < 49)

pauseFactor++;

}

} while (!owner.shutDown);

// Set indicator this thread in the pool is done.

working = false;

} // of run

}

To use the thread pool just create a static instance of the ThreadPool in your main class

private static ThreadPool threadPool = new ThreadPool(getMaxThreads(), useDaemon());

Then to launch a new thread use this.

if (runOnSeperateThread()) {

WriteTransThread task = new WriteTransThread(messasge);

// Add to the thread pool.

threadPool.assignWork(new TaskWrapper(task, "my thread"));

}

private boolean runOnSeperateThread() {

// First check if mutli threading is turned on

// and if there are open threads.

if (multiThreadingOn && threadPool.limitNotReached())

// Yes, there are open threads.

return true;

// Check if we can wait for an open thread.

if (!multiThreadingOn || !waitForOpenThread)

// No multi threading or waiting allowed.

return false;

// OK, wait for an open thread, wait for up

// to 90 seconds.

int j = 0;

do {

try {

// Wait 1/2 a second.

Thread.sleep(500);

} catch(InterruptedException ignore) { };

// Check again for an optn thread.

if (threadPool.limitNotReached())

// Ok to run on separate thread.

return true;

// Increament counter and wait again.

j++;

} while(j < 180);

// If we got this far then we've waited long enough.

logger.warn("Reached limit on waiting for an open thread in the pool");

return false;

} // of runOnSeperateThread

The WriteTransThread is a class that will extract the message body and write out its data to a table.

Last, before you end your program make sure all the threads have competed by calling the following.

//***********************************************************

// If multi-threading is activated then this method will

// wait for them to complete before returning. The

// max wait time is 300 secs or 5 minutes.

//***********************************************************

private void waitForThreadsToComplete(String message) {

// First check for multi threading.

if (prop.useMultiThread() && (threadPool.currentNum() > 0)) {

int limit = 0;

// Some threads are still active, wait

logger.info("Waiting for threads to complete before " + message

+ " current thread count is " + threadPool.currentNum());

do {

limit++;

try {

Thread.sleep(500); // Sleep 1/2 a sec.

} catch(InterruptedException ie) { } // Ignore the error for now.

// Max waith is 300 sec or 5 min.

} while ((threadPool.currentNum() > 0) && (limit < 600));

}

} // of waitForThreadsToComplete

cferris84a at 2007-7-12 8:33:16 > top of Java-index,Enterprise & Remote Computing,Enterprise Technologies...
# 4

Thank you very much for sharing your code

but there are some small error in the code that prevent compilation

Can you please give me some help with them?

salonID is not defined in TaskWrapper but it used in ThreadPool

decrement and increment are used with parameter when they accept no parameter.

Thank you

Legolas.wa at 2007-7-12 8:33:16 > top of Java-index,Enterprise & Remote Computing,Enterprise Technologies...
# 5
HiAny comment for me?Thanks
Legolas.wa at 2007-7-12 8:33:16 > top of Java-index,Enterprise & Remote Computing,Enterprise Technologies...