lib/openwfe/util/workqueue.rb in openwferu-0.9.13 vs lib/openwfe/util/workqueue.rb in openwferu-0.9.14

- old
+ new

@@ -41,135 +41,78 @@ require 'openwfe/utils' module OpenWFE + # + # This mixin provides a workqueue and a thread for executing tasks + # pushed onto it. It uses the thread.rb Queue class. + # + # It is currently only used by the ExpressionPool (maybe it'll get + # merged back into it later). + # module WorkqueueMixin - WQF_LOW = 0.500 - # - # Starts the workqueue with a given frequency (seconds) + # Creates and starts the workqueue. # def start_workqueue - @workqueue_mutex = Mutex.new - @workqueue = [] - @workqueue_frequency = 0 + @workqueue = Queue.new + @workstopped = false + OpenWFE::call_in_thread "workqueue", self do - while true - break if @workqueue_frequency == nil - sleep @workqueue_frequency - do_process_workqueue + loop do + do_process_workelement @workqueue.pop + break if @workstopped and @workqueue.empty? end end end # - # Returns true if there is or there just was activity for the ] + # Returns true if there is or there just was activity for the # work queue. # def is_workqueue_busy? - (@workqueue.size > 0 or @workqueue_frequency < WQF_LOW) + + @workqueue.size > 0 end # # Stops the workqueue. # def stop_workqueue - @workqueue_frequency = nil - do_process_workqueue - # - # maybe could process the work queue until it's really empty + + @workstopped = true end # # the method called by the mixer to actually queue the work. # def queue_work (*args) - @workqueue_mutex.synchronize do - if @workqueue - @workqueue.push args - # - # work will be done later (millisec order) - # by the work thread - else - do_process_workelement args - # - # degraded mode : as if there were no workqueue - end - end - end + if @workqueue_stopped - # - # Returns the current workqueue size - # - def workqueue_size - return 0 unless @workqueue - @workqueue.size - end - - # - # Called by the workqueue thread, copies the workqueue and clears - # it, and then processes the elt in the copied workqueue. - # (meanwhile, the empty workqueue gets filled by queue_work() - # calls) - # - # This method calls the do_process_workelement() method of - # its 'mixer' with each work element queued. - # - def do_process_workqueue - - q = nil - - @workqueue_mutex.synchronize do - - if @workqueue.size < 1 - increment_workqueue_frequency - return - end - - q = Array.new(@workqueue) - @workqueue.clear - - @workqueue_frequency = 0 + do_process_workelement args # - # back to maximum reponsiveness - end + # degraded mode : as if there were no workqueue + else - #ldebug { "do_process_workqueue() #{q.size} items to process" } - - q.each do |elt| - - do_process_workelement elt + @workqueue.push args # - # exception management is now done in the - # do_process_workelement method + # work will be done later (millisec order) + # by the work thread end - - # TODO : have a pool of workers handle that - - #q.size end - protected - - # - # No activity spotted, so decrease responsiveness - # - def increment_workqueue_frequency - - return if @workqueue_frequency == nil - - @workqueue_frequency *= 3 - - if @workqueue_frequency == 0 - @workqueue_frequency = 0.001 - elsif @workqueue_frequency > WQF_LOW - @workqueue_frequency = WQF_LOW - end - end + #-- + # Returns the current workqueue size + # + #def workqueue_size + # return 0 unless @workqueue + # @workqueue.size + #end + #++ end end