lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.4 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.5
- old
+ new
@@ -38,12 +38,10 @@
#
# Nicolas Modrzyk at openwfe.org
# John Mettraux at openwfe.org
#
-#require 'find'
-
require 'openwfe/utils'
require 'openwfe/rudefinitions'
require 'openwfe/storage/yamlfilestorage'
require 'openwfe/expressions/flowexpression'
@@ -107,10 +105,12 @@
DEFAULT_WORK_DIRECTORY
end
path = path + '/expool'
super(service_name, application_context, path)
+
+ observe_expool()
end
#
# Iterates on each expression that is of the given kind.
# Used for example by the expression pool when rescheduling.
@@ -147,22 +147,40 @@
s << "==== . ====\n"
return s
end
protected
+
+ #
+ # The actual binding of this storage as an observer of the
+ # expression pool is done here.
+ # It gives the opportunity to override this method with variants
+ # (for example for delayed execution)
+ #
+ def observe_expool
+
+ get_expression_pool.add_observer(:update) do |channel, fei, fe|
+ #ldebug { ":update for #{fei.to_debug_s}" }
+ self[fei] = fe
+ end
+ get_expression_pool.add_observer(:remove) do |channel, fei|
+ #ldebug { ":remove for #{fei.to_debug_s}" }
+ self.delete(fei)
+ end
+ end
def compute_file_path (fei)
return @basepath + "/engine_environment.yaml" \
if fei.workflow_instance_id == "0"
wfid = fei.parent_workflow_instance_id
- @basepath + "/" +
- wfid[-1, 1] + "/" +
+ @basepath +
+ #wfid[-3, 1] + "/" +
wfid[-2, 1] + "/" +
- wfid + "/" +
+ wfid[-1, 1] + "/" +
fei.workflow_instance_id + "__" +
fei.expression_id + "_" +
fei.expression_name + ".yaml"
end
@@ -173,8 +191,134 @@
if OpenWFE::ends_with(path, "_#{exp_name}.yaml")
end
return false
end
-
+ end
+
+ #
+ # With this extension of YmalFileExpressionStorage, persistence occurs
+ # in a separate thread, for a snappier response.
+ #
+ class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage
+
+ def initialize (service_name, application_context)
+
+ super
+
+ @events = {}
+ @op_count = 0
+
+ @stopped = false
+
+ start_processing_thread()
+ end
+
+ #
+ # Will take care of stopping the 'queue processing' thread.
+ #
+ def stop
+
+ @stopped = true
+
+ process_queue()
+ #
+ # flush every remaining events (especially the :delete ones)
+ end
+
+ #
+ # calls process_queue() before the call the super class each_of_kind()
+ # method
+ #
+ def each_of_kind (kind, &block)
+
+ #ldebug { "each_of_kind()" }
+
+ process_queue()
+ super
+ end
+
+ protected
+
+ def start_processing_thread
+ Thread.new do
+ while true
+ sleep(0.33)
+ break if @stopped
+ process_queue()
+ end
+ end
+ end
+
+ def queue (event, fei, fe=nil)
+ synchronize do
+
+ old_size = @events.size
+ @op_count += 1
+
+ @events[fei] = [ event, fei, fe ]
+
+ ldebug do
+ "queue() ops #{@op_count} "+
+ "size #{old_size} -> #{@events.size}"
+ end
+ end
+ end
+
+ def process_queue ()
+
+ return unless @events.size > 0
+ #
+ # trying to exit as quickly as possible
+
+ ldebug do
+ "process_queue() #{@events.size} events #{@op_count} ops"
+ end
+
+ synchronize do
+ @events.each_value do |v|
+ event = v[0]
+ begin
+ if event == :update
+ self[v[1]] = v[2]
+ else
+ safe_delete(v[1])
+ end
+ rescue Exception => e
+ lwarn do
+ "process_queue() ':#{event}' exception\n" +
+ OpenWFE::exception_to_s(e)
+ end
+ end
+ end
+ @op_count = 0
+ @events.clear
+ end
+ end
+
+ #
+ # a call to delete that tolerates missing .yaml files
+ #
+ def safe_delete (fei)
+ begin
+ self.delete(fei)
+ rescue Exception => e
+ # lwarn do
+ # "safe_delete() exception\n" +
+ # OpenWFE::exception_to_s(e)
+ # end
+ end
+ end
+
+ def observe_expool
+
+ get_expression_pool.add_observer(:update) do |event, fei, fe|
+ ldebug { ":update for #{fei.to_debug_s}" }
+ queue(event, fei, fe)
+ end
+ get_expression_pool.add_observer(:remove) do |event, fei|
+ ldebug { ":remove for #{fei.to_debug_s}" }
+ queue(event, fei)
+ end
+ end
end
end