# #-- # Copyright (c) 2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'thread' require 'openwfe/utils' module OpenWFE module WorkqueueMixin WQF_LOW = 0.500 # # Starts the workqueue with a given frequency (seconds) # def start_workqueue @workqueue_mutex = Mutex.new @workqueue = [] @workqueue_frequency = 0 OpenWFE::call_in_thread "workqueue", self do while true break if @workqueue_frequency == nil sleep @workqueue_frequency do_process_workqueue end end end # # Returns true if there is or there just was activity for the ] # work queue. # def is_workqueue_busy? (@workqueue.size > 0 or @workqueue_frequency < WQF_LOW) end # # Stops the workqueue. # def stop_workqueue @workqueue_frequency = nil do_process_workqueue # # maybe could process the work queue until it's really empty end # # the method called by the mixer to actually queue the work. # def queue_work (*args) @workqueue_mutex.synchronize do if @workqueue @workqueue.push args # # work will be done later (millisec order) # by the work thread else do_process_workelement args # # degraded mode : as if there were no workqueue end end end # # Returns the current workqueue size # def workqueue_size return 0 unless @workqueue @workqueue.size end # # Called by the workqueue thread, copies the workqueue and clears # it, and then processes the elt in the copied workqueue. # (meanwhile, the empty workqueue gets filled by queue_work() # calls) # # This method calls the do_process_workelement() method of # its 'mixer' with each work element queued. # def do_process_workqueue q = nil @workqueue_mutex.synchronize do if @workqueue.size < 1 increment_workqueue_frequency return end q = Array.new(@workqueue) @workqueue.clear @workqueue_frequency = 0 # # back to maximum reponsiveness end #ldebug { "do_process_workqueue() #{q.size} items to process" } q.each do |elt| do_process_workelement elt # # exception management is now done in the # do_process_workelement method end # TODO : have a pool of workers handle that #q.size end protected # # No activity spotted, so decrease responsiveness # def increment_workqueue_frequency return if @workqueue_frequency == nil @workqueue_frequency *= 3 if @workqueue_frequency == 0 @workqueue_frequency = 0.001 elsif @workqueue_frequency > WQF_LOW @workqueue_frequency = WQF_LOW end end end end