lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.15 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.16
- old
+ new
@@ -39,10 +39,11 @@
#
require 'openwfe/utils'
require 'openwfe/storage/yamlcustom'
require 'openwfe/storage/yamlfilestorage'
+require 'openwfe/expool/threadedexpstorage'
require 'openwfe/expressions/flowexpression'
require 'openwfe/expressions/raw_xml'
#
# making sure classes in those files are loaded
@@ -72,26 +73,24 @@
# Iterates on each expression that is of the given kind.
# Used for example by the expression pool when rescheduling.
#
def each_of_kind (kind, &block)
- return unless block
-
each_object_path do |path|
#ldebug { "each_of_kind() path is #{path}" }
#next unless matches(path, kind)
# was not OK in case of <bob activity="clean office" />
- expression = load_object(path)
+ expression = load_object path
next unless expression.is_a?(kind)
expression.application_context = @application_context
- block.call expression
+ block.call expression.fei, expression
end
end
#
# "each flow expression" : this method awaits a block then, for
@@ -104,12 +103,14 @@
each_object_path do |path|
a = self.class.split_file_path path
next unless a
+
wfid = a[0]
next if wfid_prefix and ( ! wfid.match "^#{wfid_prefix}")
+
flow_expression = load_object path
block.call flow_expression.fei, flow_expression
end
end
@@ -176,155 +177,9 @@
# if OpenWFE::ends_with(path, "_#{exp_name}.yaml")
# end
# false
#end
#++
- end
-
- #
- # This mixin gathers all the logic for a threaded expression storage,
- # one that doesn't immediately stores workitems (removes overriding
- # operations).
- # Using this threaded storage brings a very important perf benefit.
- #
- module ThreadedStorageMixin
-
- THREADED_FREQ = "427" # milliseconds
- #
- # the frequency at which the event queue should be processed
-
- #
- # Will take care of stopping the 'queue processing' thread.
- #
- def stop
-
- get_scheduler.unschedule(@thread_id) if @thread_id
-
- process_queue()
- #
- # flush every remaining events (especially the :delete ones)
- end
-
- #
- # calls process_queue() before the call the super class each_of_kind()
- # method
- #
- def each_of_kind (kind, &block)
-
- #ldebug { "each_of_kind()" }
-
- process_queue()
- super
- end
-
- #
- # calls process_queue() before the call the super class each()
- # method
- #
- def each (wfid_prefix=nil, &block)
-
- process_queue()
- super
- end
-
- protected
-
- #
- # starts the thread that does the actual persistence.
- #
- def start_processing_thread
-
- @events = {}
- @op_count = 0
-
- @thread_id = get_scheduler.schedule_every THREADED_FREQ do
- process_queue
- end
- end
-
- #
- # queues an event for later (well within a second) persistence
- #
- def queue (event, fei, fe=nil)
- synchronize do
-
- old_size = @events.size
- @op_count += 1
-
- @events[fei] = [ event, fei, fe ]
-
- ldebug do
- "queue() ops #{@op_count} "+
- "size #{old_size} -> #{@events.size}"
- end
- end
- end
-
- #
- # the actual "do persist" order
- #
- def process_queue
-
- return unless @events.size > 0
- #
- # trying to exit as quickly as possible
-
- ldebug do
- "process_queue() #{@events.size} events #{@op_count} ops"
- end
-
- synchronize do
- @events.each_value do |v|
- event = v[0]
- begin
- if event == :update
- self[v[1]] = v[2]
- else
- safe_delete(v[1])
- end
- rescue Exception => e
- lwarn do
- "process_queue() ':#{event}' exception\n" +
- OpenWFE::exception_to_s(e)
- end
- end
- end
- @op_count = 0
- @events.clear
- end
- end
-
- #
- # a call to delete that tolerates missing .yaml files
- #
- def safe_delete (fei)
- begin
- self.delete(fei)
- rescue Exception => e
- # lwarn do
- # "safe_delete() exception\n" +
- # OpenWFE::exception_to_s(e)
- # end
- end
- end
-
- #
- # Adds the queue() method as an observer to the update and remove
- # events of the expression pool.
- # :update and :remove mean changes to expressions in the persistence
- # that's why they are observed.
- #
- def observe_expool
-
- get_expression_pool.add_observer(:update) do |event, fei, fe|
- ldebug { ":update for #{fei.to_debug_s}" }
- queue(event, fei, fe)
- end
- get_expression_pool.add_observer(:remove) do |event, fei|
- ldebug { ":remove for #{fei.to_debug_s}" }
- queue(event, fei)
- end
- end
end
#
# With this extension of YmalFileExpressionStorage, persistence occurs
# in a separate thread, for a snappier response.