# 3
This is running in a J2DK environment. For my thread pool, I just copied one from a Sun tutorial on multi-threading then modified it. I creaed a seperate package with three classes. Here is my thread pooling package. I basically use a Vector to hold threads when they are not working. And you can set a limit as to the max number of active threads.
package threadpooling;
/**
*
* @author cferris
*
* This class is simple to wrap the runnable
* thread for the worker thread around other
* needed parameters like salon id and source.
*
*/
public class TaskWrapper {
// Define the needed instance fields.
public Runnable taskAtHand = null;
public String taskType = "";
//***********************************************************
// Overloaded constructor.
//***********************************************************
public TaskWrapper(Runnable task) {
// Just set the runnable
taskAtHand = task;
taskType = "WorkerThread";
} // of overloaded contructor
//***********************************************************
// Overloaded constructor.
//***********************************************************
public TaskWrapper(Runnable task, String taskType) {
// Just set the instance fields
taskAtHand = task;
this.taskType = taskType;
} // of overloaded contructor
}
package threadpooling;
/**
*
* @author cferris
*
* This class is for tracking the number of
* active threads so we don't go over the set
* limit. This class basically synchronizes
* the methods that update the active thread
* counter.
*
*/
import java.util.*;
public class ThreadPool {
// Define the instance fields.
//First define instance fields for the
//management of the thread pools - pm.
protected Vector poolOfThreads = new Vector();
private static LinkedList taskQueue = new LinkedList();
private static int poolSize = 0;
private static boolean pmAvailable = true;
private static boolean daemon = false;
publicstatic boolean shutDown = false;
//Instance field for the management of
//the threads - tm.
private static int activeThreads = 0;
private static int maxReached = 0;
private static int numTimesReached = 0;
private static boolean tmAvailable = true;
// Define constants
private static final int MAX_LIMIT_DEFAULT = 32;
//***********************************************************
// The constructor.
//***********************************************************
public ThreadPool() {
this(MAX_LIMIT_DEFAULT, false);
} // of constructor
//***********************************************************
// Overload the constructor with max number of active jobs.
//***********************************************************
public ThreadPool(int poolSize, boolean daemon) {
this.poolSize = poolSize;
this.daemon = daemon;
poolOfThreads = new Vector();
taskQueue = new LinkedList();
salonTracker = new Vector();
// Load up and initialize the threads.
for (int i=0; i < poolSize; i++) {
Thread thread = new WorkerThread(this, i);
thread.setDaemon(daemon);
thread.start();
poolOfThreads.add(thread);
}
} // of overloaded constructor
//***********************************************************
// Overload this method to allow just the runnable task.
//***********************************************************
public void assignWork(Runnable work) {
// Create the wrapper
TaskWrapper wrapper = new TaskWrapper(work, "A task");
assignWork(wrapper);
} // of overloaded assignWork
//***********************************************************
// Push a unit of work (work thread) on the the worker thread.
//***********************************************************
public synchronized void assignWork(TaskWrapper work) {
// First increament the active thread count.
increment();
// Wait until we can access the linked list
try {
if (!pmAvailable)
wait();
} catch(InterruptedException e) { }
pmAvailable = false;
notifyAll();
taskQueue.addLast(work);
pmAvailable = true;
notifyAll();
} // of assignWork
//***********************************************************
// Get the next unit of work (work thread).
//***********************************************************
public synchronized TaskWrapper getNextWorkAssignment() {
// Wait until we can access the linked list
try {
if (!pmAvailable)
wait();
} catch(InterruptedException e) { }
pmAvailable = false;
notifyAll();
TaskWrapper nextTask;
if (!taskQueue.isEmpty())
nextTask = (TaskWrapper)taskQueue.removeFirst();
else
nextTask = null;
pmAvailable = true;
notifyAll();
return nextTask;
} // of getNextWorkAssignment
//***********************************************************
// Return the number of tasks waiting on the work queue.
//***********************************************************
public synchronized int workWaiting() {
// Wait until we can access the linked list
try {
if (!pmAvailable)
wait();
} catch(InterruptedException e) { }
pmAvailable = false;
notifyAll();
int qSize = taskQueue.size();
pmAvailable = true;
notifyAll();
return qSize;
} // of workWaiting
//***********************************************************
// Wait until all the tasks have been completed.
//***********************************************************
public void waitForCompletion() {
int prev = 0;
// Continue checking untill all the threads have
// completed and been removed.
while(!poolOfThreads.isEmpty()) {
// Lets record the difference
if (poolOfThreads.size() != prev) {
System.out.println("waiting for " + poolOfThreads.size() + " to complete");
prev = poolOfThreads.size();
}
// Loop through the thread pool and check each thread.
for (int t = 0; t < poolOfThreads.size(); t++)
if (!((WorkerThread)poolOfThreads.get(t)).working)
// This thead is done so remove it.
poolOfThreads.remove(t);
} // of while
} // waitForCompletion
//***********************************************************
// Set the max number of allow threads.
//***********************************************************
public synchronized void setPoolSize(int newLimit) {
// just set the limit.
// Check if available
while (!tmAvailable) {
try {
wait();
}
catch(InterruptedException ie) {
System.exit(5);
}
}
// Ok, lock the object while we update it.
tmAvailable = false;
notifyAll();
poolSize = newLimit;
// Reset the max counters/trackers too
maxReached = 0;
numTimesReached = 0;
// Create additional threads in the pool if needed
if (poolSize > poolOfThreads.size())
// Load up and initialize the threads.
for (int i = poolOfThreads.size(); i < poolSize; i++) {
Thread thread = new WorkerThread(this, i);
thread.start();
poolOfThreads.add(thread);
}
// Done, now release the object.
tmAvailable = true;
notifyAll();
} // of setPoolSize.
//***********************************************************
// The method to increment thread counter.
//***********************************************************
private synchronized void increment() {
// Check if available
while (!tmAvailable) {
try {
wait();
}
catch(InterruptedException ie) {
// logger.fatal("Synchronized error in ThreadPool ");
// logger.fatal("Error message is: " + ie.getMessage());
System.exit(5);
}
}
// Ok, lock the object while we update it.
tmAvailable = false;
notifyAll();
activeThreads++;
// Check if this is the highest number of threads
// that has been active, if not then set so.
if (activeThreads > maxReached)
maxReached = activeThreads;
if (activeThreads == poolSize)
numTimesReached++;
// Done, now release the object.
tmAvailable = true;
notifyAll();
} // of increment
//***********************************************************
// The method to increment thread counter.
//***********************************************************
public synchronized void decrement() {
// Check if available
while (!tmAvailable) {
try {
wait();
}
catch(InterruptedException ie) {
// logger.fatal("Synchronized error in ThreadPool ");
// logger.fatal("Error message is: " + ie.getMessage());
System.exit(5);
}
}
// Ok, lock the object while we update it.
tmAvailable = false;
notifyAll();
activeThreads--;
// Done, now release the object.
tmAvailable = true;
notifyAll();
} // of decrement
//***********************************************************
// A method to determine if the maximum number of active
// threads has been reached.
//***********************************************************
public synchronized boolean limitNotReached() {
boolean limit = false;
// Check if available
while (!tmAvailable) {
try {
wait();
}
catch(InterruptedException ie) {
// logger.fatal("Synchronized error in ThreadPool.");
// logger.fatal("Error message is: " + ie.getMessage());
System.exit(5);
}
}
// Ok, lock the object while we update it.
tmAvailable = false;
notifyAll();
if (activeThreads < poolSize)
limit = true;
// Done, now release the object.
tmAvailable = true;
notifyAll();
return limit;
} // of limitNotReached
//***********************************************************
// A method to return the number of active threads.
//***********************************************************
public synchronized int currentNum() {
// Check if available
while (!tmAvailable) {
try {
wait();
}
catch(InterruptedException ie) {
// logger.fatal("Synchronized error in ThreadPool");
// logger.fatal("Error message is: " + ie.getMessage());
System.exit(5);
}
}
// Ok, lock the object while we update it.
tmAvailable = false;
notifyAll();
int numberOf = activeThreads;
// Done, now release the object.
tmAvailable = true;
notifyAll();
return numberOf;
} // of currentNum
package threadpooling;
/**
*
* @author cferris
*
*This is the work thread that receives the
*taskes or unit of work that make up the
*thread pool.
*
*/
public class WorkerThread extends Thread {
// Define need instance fields.
public boolean working = true;
public int id = 0;
private int pauseFactor = 1;
// The thread pool that this object belongs to.
public ThreadPool owner;
//*********************************************************
// Define the constructor for the seperate thread.
//*********************************************************
public WorkerThread(ThreadPool pooler, int id) {
owner = pooler;
this.id = id;
} // of contructor
//*********************************************************
// Wait for work or for system shutdown.
//*********************************************************
public void run() {
// Define the work object.
TaskWrapper unitOfWork = null;
// Wait for work or until system is shutting down.
do {
unitOfWork = owner.getNextWorkAssignment();
if (unitOfWork != null) {
// First set name and then add salon to watch list.
String taskName = unitOfWork.taskType + "-" + id;
setName(taskName);
// System.out.println("Starting thread - " + getName());
unitOfWork.taskAtHand.run();
// Done, now adjust tracker and reset the pause factor.
owner.decrement();
pauseFactor = 1;
}
else {
// No work so pause a bit
try {
sleep(150 * pauseFactor);
} catch (InterruptedException e) { }
// If consecutively pausing because of no work then
// increase the pause time up to a point.
if (pauseFactor < 49)
pauseFactor++;
}
} while (!owner.shutDown);
// Set indicator this thread in the pool is done.
working = false;
} // of run
}
To use the thread pool just create a static instance of the ThreadPool in your main class
private static ThreadPool threadPool = new ThreadPool(getMaxThreads(), useDaemon());
Then to launch a new thread use this.
if (runOnSeperateThread()) {
WriteTransThread task = new WriteTransThread(messasge);
// Add to the thread pool.
threadPool.assignWork(new TaskWrapper(task, "my thread"));
}
private boolean runOnSeperateThread() {
// First check if mutli threading is turned on
// and if there are open threads.
if (multiThreadingOn && threadPool.limitNotReached())
// Yes, there are open threads.
return true;
// Check if we can wait for an open thread.
if (!multiThreadingOn || !waitForOpenThread)
// No multi threading or waiting allowed.
return false;
// OK, wait for an open thread, wait for up
// to 90 seconds.
int j = 0;
do {
try {
// Wait 1/2 a second.
Thread.sleep(500);
} catch(InterruptedException ignore) { };
// Check again for an optn thread.
if (threadPool.limitNotReached())
// Ok to run on separate thread.
return true;
// Increament counter and wait again.
j++;
} while(j < 180);
// If we got this far then we've waited long enough.
logger.warn("Reached limit on waiting for an open thread in the pool");
return false;
} // of runOnSeperateThread
The WriteTransThread is a class that will extract the message body and write out its data to a table.
Last, before you end your program make sure all the threads have competed by calling the following.
//***********************************************************
// If multi-threading is activated then this method will
// wait for them to complete before returning. The
// max wait time is 300 secs or 5 minutes.
//***********************************************************
private void waitForThreadsToComplete(String message) {
// First check for multi threading.
if (prop.useMultiThread() && (threadPool.currentNum() > 0)) {
int limit = 0;
// Some threads are still active, wait
logger.info("Waiting for threads to complete before " + message
+ " current thread count is " + threadPool.currentNum());
do {
limit++;
try {
Thread.sleep(500); // Sleep 1/2 a sec.
} catch(InterruptedException ie) { } // Ignore the error for now.
// Max waith is 300 sec or 5 min.
} while ((threadPool.currentNum() > 0) && (limit < 600));
}
} // of waitForThreadsToComplete