lib/openwfe/expool/threadedexpstorage.rb in ruote-0.9.18 vs lib/openwfe/expool/threadedexpstorage.rb in ruote-0.9.19

- old
+ new

@@ -1,34 +1,34 @@ # #-- # Copyright (c) 2007-2008, John Mettraux, OpenWFE.org # All rights reserved. -# -# Redistribution and use in source and binary forms, with or without +# +# Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: -# +# # . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation +# list of conditions and the following disclaimer. +# +# . Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# +# # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # @@ -40,149 +40,152 @@ require 'thread' module OpenWFE + # + # This mixin gathers all the logic for a threaded expression storage, + # one that doesn't immediately stores workitems (removes overriding + # operations). + # Using this threaded storage brings a very important perf benefit. + # + module ThreadedStorageMixin + # - # This mixin gathers all the logic for a threaded expression storage, - # one that doesn't immediately stores workitems (removes overriding - # operations). - # Using this threaded storage brings a very important perf benefit. + # Will take care of stopping the 'queue processing' thread. # - module ThreadedStorageMixin + def stop - # - # Will take care of stopping the 'queue processing' thread. - # - def stop + @stopped = true + @queue.push :stop + end - @stopped = true - @queue.push :stop - end + # + # makes sure that the queue isn't actually preparing a batch + # before returning a result. + # + def find_expressions (options) - # - # makes sure that the queue isn't actually preparing a batch - # before returning a result. - # - def find_expressions (options) + Thread.pass - Thread.pass + @mutex.synchronize do + super + end + end - @mutex.synchronize do - super - end - end + protected - protected + # + # starts the thread that does the actual persistence. + # + def start_queue - # - # starts the thread that does the actual persistence. - # - def start_queue + @mutex = Mutex.new + @queue = Queue.new - @mutex = Mutex.new - @queue = Queue.new + Thread.new do - Thread.new do + loop do - loop do + events = [ @queue.pop ] - events = [ @queue.pop ] + @mutex.synchronize do - @mutex.synchronize do + 14.times { Thread.pass } + # + # gather some steam : + # let job accumulate - 14.times { Thread.pass } # gather some steam + @queue.size.times do + events << @queue.pop + end - @queue.size.times do - events << @queue.pop - end - - process_events events - end - - break if events.include?(:stop) - end - end + process_events events end - # - # queues an event for later (well within a second) persistence - # - def queue (event, fei, fexp=nil) + break if events.include?(:stop) + end + end + end - if @stopped - process_event event, fei, fexp - else - @queue.push [ event, fei, fexp ] - end - end + # + # queues an event for later (well within a second) persistence + # + def queue (event, fei, fexp=nil) - def process_events (events) + if @stopped + process_event event, fei, fexp + else + @queue.push [ event, fei, fexp ] + end + end - ldebug { "process_events() #{events.size} events" } + def process_events (events) - # reducing the operation count + ldebug { "process_events() #{events.size} events" } - events = events.inject({}) do |r, event| - r[event[1]] = event if event != :stop - r - end + # reducing the operation count - ldebug { "process_events() #{events.size} events remaining" } + events = events.inject({}) do |r, event| + r[event[1]] = event if event != :stop + r + end - # perform the remaining operations + ldebug { "process_events() #{events.size} events remaining" } - events.each_value do |event, fei, fexp| + # perform the remaining operations - process_event event, fei, fexp - end - end + events.each_value do |event, fei, fexp| - def process_event (event, fei, fexp) + process_event event, fei, fexp + end + end - begin - if event == :update - self[fei] = fexp - else - safe_delete fei - end - rescue Exception => e - lwarn do - "process_event() ':#{event}' exception\n" + - OpenWFE::exception_to_s(e) - end - end - end + def process_event (event, fei, fexp) - # - # a call to delete that tolerates missing .yaml files - # - def safe_delete (fei) - begin - self.delete fei - rescue Exception => e - # lwarn do - # "safe_delete() exception\n" + - # OpenWFE::exception_to_s(e) - # end - end - end + begin + if event == :update + self[fei] = fexp + else + safe_delete fei + end + rescue Exception => e + lwarn do + "process_event() ':#{event}' exception\n" + + OpenWFE::exception_to_s(e) + end + end + end - # - # Adds the queue() method as an observer to the update and remove - # events of the expression pool. - # :update and :remove mean changes to expressions in the persistence - # that's why they are observed. - # - def observe_expool + # + # a call to delete that tolerates missing .yaml files + # + def safe_delete (fei) + begin + self.delete fei + rescue Exception => e + # lwarn do + # "safe_delete() exception\n" + + # OpenWFE::exception_to_s(e) + # end + end + end - get_expression_pool.add_observer(:update) do |event, fei, fe| - ldebug { ":update for #{fei.to_debug_s}" } - queue event, fei, fe - end - get_expression_pool.add_observer(:remove) do |event, fei| - ldebug { ":remove for #{fei.to_debug_s}" } - queue event, fei - end - end - end + # + # Adds the queue() method as an observer to the update and remove + # events of the expression pool. + # :update and :remove mean changes to expressions in the persistence + # that's why they are observed. + # + def observe_expool + + get_expression_pool.add_observer(:update) do |event, fei, fe| + ldebug { ":update for #{fei.to_debug_s}" } + queue event, fei, fe + end + get_expression_pool.add_observer(:remove) do |event, fei| + ldebug { ":remove for #{fei.to_debug_s}" } + queue event, fei + end + end + end end