lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.15 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.16

- old
+ new

@@ -39,10 +39,11 @@ # require 'openwfe/utils' require 'openwfe/storage/yamlcustom' require 'openwfe/storage/yamlfilestorage' +require 'openwfe/expool/threadedexpstorage' require 'openwfe/expressions/flowexpression' require 'openwfe/expressions/raw_xml' # # making sure classes in those files are loaded @@ -72,26 +73,24 @@ # Iterates on each expression that is of the given kind. # Used for example by the expression pool when rescheduling. # def each_of_kind (kind, &block) - return unless block - each_object_path do |path| #ldebug { "each_of_kind() path is #{path}" } #next unless matches(path, kind) # was not OK in case of <bob activity="clean office" /> - expression = load_object(path) + expression = load_object path next unless expression.is_a?(kind) expression.application_context = @application_context - block.call expression + block.call expression.fei, expression end end # # "each flow expression" : this method awaits a block then, for @@ -104,12 +103,14 @@ each_object_path do |path| a = self.class.split_file_path path next unless a + wfid = a[0] next if wfid_prefix and ( ! wfid.match "^#{wfid_prefix}") + flow_expression = load_object path block.call flow_expression.fei, flow_expression end end @@ -176,155 +177,9 @@ # if OpenWFE::ends_with(path, "_#{exp_name}.yaml") # end # false #end #++ - end - - # - # This mixin gathers all the logic for a threaded expression storage, - # one that doesn't immediately stores workitems (removes overriding - # operations). - # Using this threaded storage brings a very important perf benefit. - # - module ThreadedStorageMixin - - THREADED_FREQ = "427" # milliseconds - # - # the frequency at which the event queue should be processed - - # - # Will take care of stopping the 'queue processing' thread. - # - def stop - - get_scheduler.unschedule(@thread_id) if @thread_id - - process_queue() - # - # flush every remaining events (especially the :delete ones) - end - - # - # calls process_queue() before the call the super class each_of_kind() - # method - # - def each_of_kind (kind, &block) - - #ldebug { "each_of_kind()" } - - process_queue() - super - end - - # - # calls process_queue() before the call the super class each() - # method - # - def each (wfid_prefix=nil, &block) - - process_queue() - super - end - - protected - - # - # starts the thread that does the actual persistence. - # - def start_processing_thread - - @events = {} - @op_count = 0 - - @thread_id = get_scheduler.schedule_every THREADED_FREQ do - process_queue - end - end - - # - # queues an event for later (well within a second) persistence - # - def queue (event, fei, fe=nil) - synchronize do - - old_size = @events.size - @op_count += 1 - - @events[fei] = [ event, fei, fe ] - - ldebug do - "queue() ops #{@op_count} "+ - "size #{old_size} -> #{@events.size}" - end - end - end - - # - # the actual "do persist" order - # - def process_queue - - return unless @events.size > 0 - # - # trying to exit as quickly as possible - - ldebug do - "process_queue() #{@events.size} events #{@op_count} ops" - end - - synchronize do - @events.each_value do |v| - event = v[0] - begin - if event == :update - self[v[1]] = v[2] - else - safe_delete(v[1]) - end - rescue Exception => e - lwarn do - "process_queue() ':#{event}' exception\n" + - OpenWFE::exception_to_s(e) - end - end - end - @op_count = 0 - @events.clear - end - end - - # - # a call to delete that tolerates missing .yaml files - # - def safe_delete (fei) - begin - self.delete(fei) - rescue Exception => e - # lwarn do - # "safe_delete() exception\n" + - # OpenWFE::exception_to_s(e) - # end - end - end - - # - # Adds the queue() method as an observer to the update and remove - # events of the expression pool. - # :update and :remove mean changes to expressions in the persistence - # that's why they are observed. - # - def observe_expool - - get_expression_pool.add_observer(:update) do |event, fei, fe| - ldebug { ":update for #{fei.to_debug_s}" } - queue(event, fei, fe) - end - get_expression_pool.add_observer(:remove) do |event, fei| - ldebug { ":remove for #{fei.to_debug_s}" } - queue(event, fei) - end - end end # # With this extension of YmalFileExpressionStorage, persistence occurs # in a separate thread, for a snappier response.