lib/openwfe/util/workqueue.rb in openwferu-0.9.13 vs lib/openwfe/util/workqueue.rb in openwferu-0.9.14
- old
+ new
@@ -41,135 +41,78 @@
require 'openwfe/utils'
module OpenWFE
+ #
+ # This mixin provides a workqueue and a thread for executing tasks
+ # pushed onto it. It uses the thread.rb Queue class.
+ #
+ # It is currently only used by the ExpressionPool (maybe it'll get
+ # merged back into it later).
+ #
module WorkqueueMixin
- WQF_LOW = 0.500
-
#
- # Starts the workqueue with a given frequency (seconds)
+ # Creates and starts the workqueue.
#
def start_workqueue
- @workqueue_mutex = Mutex.new
- @workqueue = []
- @workqueue_frequency = 0
+ @workqueue = Queue.new
+ @workstopped = false
+
OpenWFE::call_in_thread "workqueue", self do
- while true
- break if @workqueue_frequency == nil
- sleep @workqueue_frequency
- do_process_workqueue
+ loop do
+ do_process_workelement @workqueue.pop
+ break if @workstopped and @workqueue.empty?
end
end
end
#
- # Returns true if there is or there just was activity for the ]
+ # Returns true if there is or there just was activity for the
# work queue.
#
def is_workqueue_busy?
- (@workqueue.size > 0 or @workqueue_frequency < WQF_LOW)
+
+ @workqueue.size > 0
end
#
# Stops the workqueue.
#
def stop_workqueue
- @workqueue_frequency = nil
- do_process_workqueue
- #
- # maybe could process the work queue until it's really empty
+
+ @workstopped = true
end
#
# the method called by the mixer to actually queue the work.
#
def queue_work (*args)
- @workqueue_mutex.synchronize do
- if @workqueue
- @workqueue.push args
- #
- # work will be done later (millisec order)
- # by the work thread
- else
- do_process_workelement args
- #
- # degraded mode : as if there were no workqueue
- end
- end
- end
+ if @workqueue_stopped
- #
- # Returns the current workqueue size
- #
- def workqueue_size
- return 0 unless @workqueue
- @workqueue.size
- end
-
- #
- # Called by the workqueue thread, copies the workqueue and clears
- # it, and then processes the elt in the copied workqueue.
- # (meanwhile, the empty workqueue gets filled by queue_work()
- # calls)
- #
- # This method calls the do_process_workelement() method of
- # its 'mixer' with each work element queued.
- #
- def do_process_workqueue
-
- q = nil
-
- @workqueue_mutex.synchronize do
-
- if @workqueue.size < 1
- increment_workqueue_frequency
- return
- end
-
- q = Array.new(@workqueue)
- @workqueue.clear
-
- @workqueue_frequency = 0
+ do_process_workelement args
#
- # back to maximum reponsiveness
- end
+ # degraded mode : as if there were no workqueue
+ else
- #ldebug { "do_process_workqueue() #{q.size} items to process" }
-
- q.each do |elt|
-
- do_process_workelement elt
+ @workqueue.push args
#
- # exception management is now done in the
- # do_process_workelement method
+ # work will be done later (millisec order)
+ # by the work thread
end
-
- # TODO : have a pool of workers handle that
-
- #q.size
end
- protected
-
- #
- # No activity spotted, so decrease responsiveness
- #
- def increment_workqueue_frequency
-
- return if @workqueue_frequency == nil
-
- @workqueue_frequency *= 3
-
- if @workqueue_frequency == 0
- @workqueue_frequency = 0.001
- elsif @workqueue_frequency > WQF_LOW
- @workqueue_frequency = WQF_LOW
- end
- end
+ #--
+ # Returns the current workqueue size
+ #
+ #def workqueue_size
+ # return 0 unless @workqueue
+ # @workqueue.size
+ #end
+ #++
end
end