platform/bb/RubyVM/src/com/rho/ThreadQueue.java in rhodes-2.2.6 vs platform/bb/RubyVM/src/com/rho/ThreadQueue.java in rhodes-2.3.0.beta.1

- old
+ new

@@ -18,69 +18,79 @@ public interface IQueueCommand { public abstract boolean equals(IQueueCommand cmd); public abstract String toString(); - public abstract void execute(); }; - RhoClassFactory m_ptrFactory; - int m_nPollInterval; - Mutex m_mxStackCommands = new Mutex(); - LinkedList m_stackCommands = new LinkedList(); + private RhoClassFactory m_ptrFactory; + private int m_nPollInterval; + private Object m_mxStackCommands;// = new Mutex(); + private LinkedList m_stackCommands = new LinkedList(); boolean m_bNoThreaded; - int getPollInterval(){ return m_nPollInterval;} + public abstract void processCommand(IQueueCommand pCmd); + public void onTimeout(){} + + public int getPollInterval(){ return m_nPollInterval;} - boolean isNoThreadedMode(){ return m_bNoThreaded; } - void setNonThreadedMode(boolean b){m_bNoThreaded = b;} + public boolean isNoThreadedMode(){ return m_bNoThreaded; } + public void setNonThreadedMode(boolean b){m_bNoThreaded = b;} public RhoClassFactory getFactory(){ return m_ptrFactory; } - int getLastPollInterval(){ return 0;} - //public abstract void processCommand(IQueueCommand pCmd); - boolean isSkipDuplicateCmd() { return false; } + public int getLastPollInterval(){ return 0;} + public boolean isSkipDuplicateCmd() { return false; } public ThreadQueue(RhoClassFactory factory) { super(factory); m_nPollInterval = QUEUE_POLL_INTERVAL_SECONDS; m_bNoThreaded = false; m_ptrFactory = factory; + + m_mxStackCommands = getSyncObject(); } + protected void addQueueCommandInt(IQueueCommand pCmd) + { + LOG.INFO("addCommand: " + pCmd.toString()); + + synchronized(m_mxStackCommands) + { + boolean bExist = false; + if ( isSkipDuplicateCmd() ) + { + for ( int i = 0; i < (int)m_stackCommands.size(); i++ ) + { + if ( m_stackCommands.get(i).equals(pCmd) ) + { + LOG.INFO("Command already exists in queue. Skip it."); + bExist = true; + break; + } + } + } + + if ( !bExist ) + m_stackCommands.add(pCmd); + } + } + public void addQueueCommand(IQueueCommand pCmd) { - LOG.INFO("addCommand: " + pCmd.toString() ); - synchronized(m_mxStackCommands) - { - boolean bExist = false; - if ( isSkipDuplicateCmd() ) - { - for ( int i = 0; i < (int)m_stackCommands.size(); i++ ) - { - if ( m_stackCommands.get(i).equals(pCmd) ) - { - bExist = true; - break; - } - } - } + addQueueCommandInt(pCmd); - if ( !bExist ) - m_stackCommands.add(pCmd); - } - if ( isNoThreadedMode() ) processCommands(); - else - stopWait(); + else if ( isAlive() ) + stopWait(); } - + public void run() { LOG.INFO("Starting main routine..."); int nLastPollInterval = getLastPollInterval(); @@ -95,25 +105,38 @@ nWait = QUEUE_STARTUP_INTERVAL_SECONDS; else nWait = nWait2; } - if ( nWait >= 0 && !isStopping() && isNoCommands() ) - { - LOG.INFO("ThreadQueue blocked for " + nWait + " seconds..."); - wait(nWait); + synchronized(m_mxStackCommands) + { + if ( nWait >= 0 && !isStopping() && isNoCommands() ) + { + LOG.INFO("ThreadQueue blocked for " + nWait + " seconds..."); + wait(nWait); + + if ( isNoCommands() ) + onTimeout(); + } } nLastPollInterval = 0; if ( !isStopping() ) - processCommands(); + { + try{ + processCommands(); + }catch(Exception e) + { + LOG.ERROR("processCommand failed", e); + } + } } LOG.INFO("Thread shutdown"); } - boolean isNoCommands() + public boolean isNoCommands() { boolean bEmpty = false; synchronized(m_mxStackCommands) { bEmpty = m_stackCommands.isEmpty(); @@ -134,16 +157,12 @@ processCommand(pCmd); } } - public void processCommand(IQueueCommand pCmd) - { - pCmd.execute(); - } - public void setPollInterval(int nInterval) { - m_nPollInterval = nInterval; - stopWait(); + m_nPollInterval = nInterval; + if ( isAlive() ) + stopWait(); } }