platform/bb/RubyVM/src/com/rho/ThreadQueue.java in rhodes-2.2.6 vs platform/bb/RubyVM/src/com/rho/ThreadQueue.java in rhodes-2.3.0.beta.1
- old
+ new
@@ -18,69 +18,79 @@
public interface IQueueCommand
{
public abstract boolean equals(IQueueCommand cmd);
public abstract String toString();
- public abstract void execute();
};
- RhoClassFactory m_ptrFactory;
- int m_nPollInterval;
- Mutex m_mxStackCommands = new Mutex();
- LinkedList m_stackCommands = new LinkedList();
+ private RhoClassFactory m_ptrFactory;
+ private int m_nPollInterval;
+ private Object m_mxStackCommands;// = new Mutex();
+ private LinkedList m_stackCommands = new LinkedList();
boolean m_bNoThreaded;
- int getPollInterval(){ return m_nPollInterval;}
+ public abstract void processCommand(IQueueCommand pCmd);
+ public void onTimeout(){}
+
+ public int getPollInterval(){ return m_nPollInterval;}
- boolean isNoThreadedMode(){ return m_bNoThreaded; }
- void setNonThreadedMode(boolean b){m_bNoThreaded = b;}
+ public boolean isNoThreadedMode(){ return m_bNoThreaded; }
+ public void setNonThreadedMode(boolean b){m_bNoThreaded = b;}
public RhoClassFactory getFactory(){ return m_ptrFactory; }
- int getLastPollInterval(){ return 0;}
- //public abstract void processCommand(IQueueCommand pCmd);
- boolean isSkipDuplicateCmd() { return false; }
+ public int getLastPollInterval(){ return 0;}
+ public boolean isSkipDuplicateCmd() { return false; }
public ThreadQueue(RhoClassFactory factory)
{
super(factory);
m_nPollInterval = QUEUE_POLL_INTERVAL_SECONDS;
m_bNoThreaded = false;
m_ptrFactory = factory;
+
+ m_mxStackCommands = getSyncObject();
}
+ protected void addQueueCommandInt(IQueueCommand pCmd)
+ {
+ LOG.INFO("addCommand: " + pCmd.toString());
+
+ synchronized(m_mxStackCommands)
+ {
+ boolean bExist = false;
+ if ( isSkipDuplicateCmd() )
+ {
+ for ( int i = 0; i < (int)m_stackCommands.size(); i++ )
+ {
+ if ( m_stackCommands.get(i).equals(pCmd) )
+ {
+ LOG.INFO("Command already exists in queue. Skip it.");
+ bExist = true;
+ break;
+ }
+ }
+ }
+
+ if ( !bExist )
+ m_stackCommands.add(pCmd);
+ }
+ }
+
public void addQueueCommand(IQueueCommand pCmd)
{
- LOG.INFO("addCommand: " + pCmd.toString() );
- synchronized(m_mxStackCommands)
- {
- boolean bExist = false;
- if ( isSkipDuplicateCmd() )
- {
- for ( int i = 0; i < (int)m_stackCommands.size(); i++ )
- {
- if ( m_stackCommands.get(i).equals(pCmd) )
- {
- bExist = true;
- break;
- }
- }
- }
+ addQueueCommandInt(pCmd);
- if ( !bExist )
- m_stackCommands.add(pCmd);
- }
-
if ( isNoThreadedMode() )
processCommands();
- else
- stopWait();
+ else if ( isAlive() )
+ stopWait();
}
-
+
public void run()
{
LOG.INFO("Starting main routine...");
int nLastPollInterval = getLastPollInterval();
@@ -95,25 +105,38 @@
nWait = QUEUE_STARTUP_INTERVAL_SECONDS;
else
nWait = nWait2;
}
- if ( nWait >= 0 && !isStopping() && isNoCommands() )
- {
- LOG.INFO("ThreadQueue blocked for " + nWait + " seconds...");
- wait(nWait);
+ synchronized(m_mxStackCommands)
+ {
+ if ( nWait >= 0 && !isStopping() && isNoCommands() )
+ {
+ LOG.INFO("ThreadQueue blocked for " + nWait + " seconds...");
+ wait(nWait);
+
+ if ( isNoCommands() )
+ onTimeout();
+ }
}
nLastPollInterval = 0;
if ( !isStopping() )
- processCommands();
+ {
+ try{
+ processCommands();
+ }catch(Exception e)
+ {
+ LOG.ERROR("processCommand failed", e);
+ }
+ }
}
LOG.INFO("Thread shutdown");
}
- boolean isNoCommands()
+ public boolean isNoCommands()
{
boolean bEmpty = false;
synchronized(m_mxStackCommands)
{
bEmpty = m_stackCommands.isEmpty();
@@ -134,16 +157,12 @@
processCommand(pCmd);
}
}
- public void processCommand(IQueueCommand pCmd)
- {
- pCmd.execute();
- }
-
public void setPollInterval(int nInterval)
{
- m_nPollInterval = nInterval;
- stopWait();
+ m_nPollInterval = nInterval;
+ if ( isAlive() )
+ stopWait();
}
}