lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.4 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.5

- old
+ new

@@ -38,12 +38,10 @@ # # Nicolas Modrzyk at openwfe.org # John Mettraux at openwfe.org # -#require 'find' - require 'openwfe/utils' require 'openwfe/rudefinitions' require 'openwfe/storage/yamlfilestorage' require 'openwfe/expressions/flowexpression' @@ -107,10 +105,12 @@ DEFAULT_WORK_DIRECTORY end path = path + '/expool' super(service_name, application_context, path) + + observe_expool() end # # Iterates on each expression that is of the given kind. # Used for example by the expression pool when rescheduling. @@ -147,22 +147,40 @@ s << "==== . ====\n" return s end protected + + # + # The actual binding of this storage as an observer of the + # expression pool is done here. + # It gives the opportunity to override this method with variants + # (for example for delayed execution) + # + def observe_expool + + get_expression_pool.add_observer(:update) do |channel, fei, fe| + #ldebug { ":update for #{fei.to_debug_s}" } + self[fei] = fe + end + get_expression_pool.add_observer(:remove) do |channel, fei| + #ldebug { ":remove for #{fei.to_debug_s}" } + self.delete(fei) + end + end def compute_file_path (fei) return @basepath + "/engine_environment.yaml" \ if fei.workflow_instance_id == "0" wfid = fei.parent_workflow_instance_id - @basepath + "/" + - wfid[-1, 1] + "/" + + @basepath + + #wfid[-3, 1] + "/" + wfid[-2, 1] + "/" + - wfid + "/" + + wfid[-1, 1] + "/" + fei.workflow_instance_id + "__" + fei.expression_id + "_" + fei.expression_name + ".yaml" end @@ -173,8 +191,134 @@ if OpenWFE::ends_with(path, "_#{exp_name}.yaml") end return false end - + end + + # + # With this extension of YmalFileExpressionStorage, persistence occurs + # in a separate thread, for a snappier response. + # + class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage + + def initialize (service_name, application_context) + + super + + @events = {} + @op_count = 0 + + @stopped = false + + start_processing_thread() + end + + # + # Will take care of stopping the 'queue processing' thread. + # + def stop + + @stopped = true + + process_queue() + # + # flush every remaining events (especially the :delete ones) + end + + # + # calls process_queue() before the call the super class each_of_kind() + # method + # + def each_of_kind (kind, &block) + + #ldebug { "each_of_kind()" } + + process_queue() + super + end + + protected + + def start_processing_thread + Thread.new do + while true + sleep(0.33) + break if @stopped + process_queue() + end + end + end + + def queue (event, fei, fe=nil) + synchronize do + + old_size = @events.size + @op_count += 1 + + @events[fei] = [ event, fei, fe ] + + ldebug do + "queue() ops #{@op_count} "+ + "size #{old_size} -> #{@events.size}" + end + end + end + + def process_queue () + + return unless @events.size > 0 + # + # trying to exit as quickly as possible + + ldebug do + "process_queue() #{@events.size} events #{@op_count} ops" + end + + synchronize do + @events.each_value do |v| + event = v[0] + begin + if event == :update + self[v[1]] = v[2] + else + safe_delete(v[1]) + end + rescue Exception => e + lwarn do + "process_queue() ':#{event}' exception\n" + + OpenWFE::exception_to_s(e) + end + end + end + @op_count = 0 + @events.clear + end + end + + # + # a call to delete that tolerates missing .yaml files + # + def safe_delete (fei) + begin + self.delete(fei) + rescue Exception => e + # lwarn do + # "safe_delete() exception\n" + + # OpenWFE::exception_to_s(e) + # end + end + end + + def observe_expool + + get_expression_pool.add_observer(:update) do |event, fei, fe| + ldebug { ":update for #{fei.to_debug_s}" } + queue(event, fei, fe) + end + get_expression_pool.add_observer(:remove) do |event, fei| + ldebug { ":remove for #{fei.to_debug_s}" } + queue(event, fei) + end + end end end