stop and garbage collect a network of threads

Hi,

i have written a p2p network simulator. Each peer is represented by a thread and owns a Mailbox (a Monitor). Threads/peers own references to mailboxes of other threads/peers -and places data there- and their mailboxes are "pointed to" by other threads/peers. A peer/thread is blocking/waiting if its Mailbox is empty. A peer/thread may also be in a sleep state.

The run method of the Peer (implements Runnable) class:

publicvoid run()

{

while (executable_)

{

cat_.info(thisPeerId_+" processing messages ...");

Iterator<MsgInterface> iter=getMessages().iterator();

while (iter.hasNext())

{

MsgInterface msg=iter.next();

processMessage((Msg)msg);

}

try

{

Thread.sleep(DELAY);

}

catch (InterruptedException e)

{

thisMailbox_=null;

thePit_=null;

return;

}

}

}//run

(1) How can i kill all this threads? One method i have used is calling the "setExecutable(boolean flag)" on each peer and changing the flag of the outer while loop of the run() method. This doesn't work since most threads block most of the time and they do not revalidate the while condition.

(2) Another method i have used is calling the interrupt method on each thread/Peer, in order the

thisMailbox_=null;

thePit_=null;

return;

to be executed. However, since it is waiting and not sleeping most of the time, this doesn't work either.

(3) I will probably have to do something with the Monitors and the "waiting" state of the threads.

Can somebody please provide advice? I probably need a combination to handle all (1),(2),(3) above -isn't it?

Another question; in the general case, one does a setExecutable(false) on a thread -and a thread validates the condition and stops running, would the thread get garbage collected?

Thanks a lot!

[2665 byte] By [uiga] at [2007-11-27 4:24:48]
# 1

executable_ Needs to be volatile or the current thread may never see a change. Then when you change the value of executable_ the thread will exit the run() method and die.

Since you haven't provided just what setExecutable(boolean flag) is, I can't comment on that.

I don't see any code to wait(), so I have no idea what you are talking about as far as waiting is concerned.

cooper6a at 2007-7-12 9:32:43 > top of Java-index,Core,Core APIs...
# 2

Here's the Mailbox code:

package swarm.sys.mail;

import java.util.*;

import java.io.*;

import org.apache.log4j.*;

import swarm.sys.id.*;

import swarm.sys.interfaces.*;

import swarm.sys.common.*;

import swarm.sys.peers.messages.*;

import swarm.sys.farm.*;

import swarm.sys.peers.*;

public abstract class Mailbox implements Serializable

{

/************************************CONSTRUCTOR_FIELDS***************************************/

protected IDInterface id_;

protected String label_;

/************************************CONSTRUCTOR_FIELDS***************************************/

protected boolean empty_=true;

protected Vector<MsgInterface> msgBox_=new Vector<MsgInterface>();

/* to be used by the Peers

*/

protected Mailbox(IDInterface id)

{

label_="no-label";

id_=id;

}//Mailbox

/* to be used by the monitor and the farms;

* label can be: FIN_MAIL, FOUT_MAIL, FPIT_MAIL, MIN_MAIL, SMIN_MAIL

*/

protected Mailbox(String label)

{

id_=new ID("UNSPECIFIED",0);

label_=label;

}//Mailbox

/* label cannot change after Mailbox creation, so method

* doesn't need to be synchronized

*/

public String getLabel()

{

return label_;

}//getLabel

/* id cannot change after Mailbox creation, so method

* doesn't need to be synchronized

*/

public IDInterface getId()

{

return id_;

}//getId

public synchronized boolean isEmpty()

{

return empty_;

}//isEmpty

public abstract Collection<MsgInterface> removeAndGetMessages();

public abstract void addMessage(MsgInterface msg);

public synchronized Collection<MsgInterface> messageSnapshot()

{

Collection<MsgInterface> msgBoxCopy=new HashSet<MsgInterface>();

msgBoxCopy.addAll(msgBox_);

return msgBoxCopy;

}//messageSnapshot

public synchronized String toString()

{

String s="";

s=s+'\n';

s=s+"--"+'\n';

s=s+" Mailbox: "+label_.toString()+'\n';

s=s+"--"+'\n';

if (empty_)

{

s=s+"empty"+'\n';

}

else

{

for (MsgInterface msg: msgBox_)

{

if (msg.getMsgId()!=null)

s=s+msg.toString()+'\n';

else

s=s+msg.getMsgId().toString()+'\n';

}

}

return s;

}//toString

public static Mailbox newMailbox(Object o)

{

if (o instanceof IDInterface)

{

return new PeerMailbox((IDInterface)o);

}//instanceof PeerBasic

else

if (o instanceof String)

{

String type=(String)o;

if (type.equals("FIN_MAIL"))

{

return new FarmInMailbox();

}//FarmInMailbox

else if (type.equals("FOUT_MAIL"))

{

return new FarmOutMailbox();

}//FarmOutMailbox

else if (type.equals("FPIT_MAIL"))

{

return new FarmPitMailbox();

}//FarmPitMailbox

else if (type.equals("MPIT_MAIL"))

{

return new MonitorPitMailbox();

}//MonitorPitMailbox

else if (type.equals("MIN_MAIL"))

{

return new MonitorInMailbox();

}//MonitorInMailbox

else if (type.equals("MOUT_MAIL"))

{

return new MonitorOutMailbox();

}//MonitorOutMailbox

else if (type.equals("MSIN_MAIL"))

{

return new MonitorSInMailbox();

}//MonitorSInMailbox

else if (type.equals("MSOUT_MAIL"))

{

return new MonitorSOutMailbox();

}//MonitorSInMailbox

else if (type.equals("FSOUT_MAIL"))

{

return new FarmSOutMailbox();

}//FarmSOutMailbox

else if (type.equals("FSIN_MAIL"))

{

return new FarmSInMailbox();

}//FarmSInMailbox

}//instanceof String

return null;

}//newMailbox

}

and here's the PeerMailbox code -there are other kind of mailboxes too:

package swarm.sys.mail;

import org.apache.log4j.*;

import java.util.*;

import java.net.*;

import swarm.sys.farm.*;

import swarm.sys.interfaces.*;

import swarm.sys.common.*;

public class PeerMailbox extends Mailbox

{

private static transient Logger cat_=Farm.getLogger();

private String hostname_;

public PeerMailbox(IDInterface id)

{

super(id);

try

{

hostname_= Net.extractHostname(InetAddress.getLocalHost());

}

catch (UnknownHostException ex)

{

ex.printStackTrace();

}

}//PeerMail

public synchronized Vector<MsgInterface> removeAndGetMessages()

{

while (empty_)

{

try

{

wait();

}

catch(InterruptedException e)

{

}

}

Vector<MsgInterface> msgBoxCopy=new Vector<MsgInterface>();

msgBoxCopy.addAll(msgBox_);

//logging

String nsMsgId="";

if (!(cat_.getLevel().equals(Level.OFF)))

{

Iterator<MsgInterface> iMsgBox=msgBox_.iterator();

while (iMsgBox.hasNext())

{

IDInterface anId=iMsgBox.next().getMsgId();

nsMsgId=nsMsgId+ anId.toString()+" ";

}

}

cat_.info(id_+" PeerMail: removing all messages with identifiers: "+nsMsgId);

//logging

msgBox_.clear();

empty_=true;

notifyAll();

return msgBoxCopy;

}//removeAndGetMessages

public synchronized void addMessage(MsgInterface msg)

{

msgBox_.add(msg);

cat_.info(id_+" PeerMail: added message "+ msg.getMsgId());

empty_=false;

notifyAll();

}//addMessage

public synchronized String toString()

{

String s="";

s=s+'\n';

s=s+"--"+'\n';

s=s+" Mailbox: "+id_.toString()+'\n';

s=s+"--"+'\n';

if (empty_)

{

s=s+"empty"+'\n';

}

else

{

Iterator<MsgInterface> iter=msgBox_.iterator();

while (iter.hasNext())

{

s=s+iter.next().getMsgId().toString()+'\n';

}

}

return s;

}//toString

}

uiga at 2007-7-12 9:32:44 > top of Java-index,Core,Core APIs...
# 3

Throwing code at the problem never solved anything. When you post code, only post what is necessary.

I still don't see a reference to executable_ or setExecutable(boolean flag).

Your original question has to do with ending the thread. Exiting in the run() method does that.

cooper6a at 2007-7-12 9:32:44 > top of Java-index,Core,Core APIs...
# 4
>>cooper6Sorry for straying from the topic (slightly), but I don't really see how can executable_ not ever get rechecked? Wouldn't run() finish in the next iteration?p.s.yes, I am aware that I might not be getting some basic concepts here...
jadespirita at 2007-7-12 9:32:44 > top of Java-index,Core,Core APIs...
# 5

I'm looking at your first post with:

while (executable_)

{

If executable_ isn't volatile, then the while{} may never see a change, depending on the compiler, hot-spot etc. The logic here is that since it's not volatile no other thread will touch it. Since your code doesn't change it, it never will be changed so ignore it. Compilers assume many things. Some good, some not so good.

cooper6a at 2007-7-12 9:32:44 > top of Java-index,Core,Core APIs...