lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.11 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.12

- old
+ new

@@ -62,16 +62,10 @@ class YamlFileExpressionStorage < YamlFileStorage include OwfeServiceLocator def initialize (service_name, application_context) - #path = if (@application_context) - # @application_context[:work_directory] - #else - # DEFAULT_WORK_DIRECTORY - #end - #path = path + '/expool' path = OpenWFE::get_work_directory + '/expool' super(service_name, application_context, path) observe_expool() @@ -84,12 +78,10 @@ def each_of_kind (kind, &block) return unless block exp_names = get_expression_map.get_expression_names(kind) - #require 'pp' - #pp exp_names each_object_path do |path| #ldebug { "each_of_kind() path is #{path}" } @@ -100,13 +92,28 @@ block.call expression end end - def each (&block) - each_object do |flow_expression| - block.call(flow_expression.fei, flow_expression) + # + # "each flow expression" : this method awaits a block then, for + # each flow_expression in this storage, calls that block. + # + # If wfid_prefix is set, only expressions whose wfid (workflow instance + # id (process instance id)) will be taken into account. + # + def each (wfid_prefix=nil, &block) + + each_object_path do |path| + + a = self.class.split_file_path path + next unless a + wfid = a[0] + next if wfid_prefix and ( ! wfid.match "^#{wfid_prefix}") + flow_expression = load_object path + + block.call flow_expression.fei, flow_expression end end alias :real_each :each @@ -117,13 +124,26 @@ each_object_path do |path| s << path s << "\n" end s << "==== . ====\n" - return s + s end + # + # Returns nil (if the path doesn't match an stored expression path) + # or an array [ workflow_instance_id, expression_id, expression_name ]. + # + # This is a class method (not an instance one). + # + def self.split_file_path (path) + + md = path.match %r{.*/(.*)__([\d.]*)_(.*).yaml} + return nil unless md + [ md[1], md[2], md[3] ] + end + protected # # The actual binding of this storage as an observer of the # expression pool is done here. @@ -147,29 +167,32 @@ return @basepath + "/engine_environment.yaml" \ if fei.workflow_instance_id == "0" wfid = fei.parent_workflow_instance_id - #a_wfid = OpenWFE::split_wfid(wfid) a_wfid = get_wfid_generator.split_wfid(wfid) @basepath + a_wfid[-2] + "/" + a_wfid[-1] + "/" + fei.workflow_instance_id + "__" + fei.expression_id + "_" + fei.expression_name + ".yaml" end + # + # Returns true if the path points to a file containing an + # expression whose name is in exp_names. + # def matches (path, exp_names) exp_names.each do |exp_name| return true \ if OpenWFE::ends_with(path, "_#{exp_name}.yaml") end - return false + false end end # # With this extension of YmalFileExpressionStorage, persistence occurs @@ -179,10 +202,13 @@ FREQ = "400" # milliseconds # # the frequency at which the event queue should be processed + # + # TODO : make this configurable (param in the apcontext ?) + def initialize (service_name, application_context) super @events = {} @@ -219,25 +245,31 @@ # # calls process_queue() before the call the super class each() # method # - def each (&block) + def each (wfid_prefix=nil, &block) process_queue() super end protected + # + # starts the thread that does the actual persistence. + # def start_processing_thread @thread_id = get_scheduler.schedule_every(FREQ) do process_queue() end end + # + # queues an event for later (well within a second) persistence + # def queue (event, fei, fe=nil) synchronize do old_size = @events.size @op_count += 1 @@ -249,11 +281,14 @@ "size #{old_size} -> #{@events.size}" end end end - def process_queue () + # + # the actual "do persist" order + # + def process_queue return unless @events.size > 0 # # trying to exit as quickly as possible @@ -294,9 +329,15 @@ # OpenWFE::exception_to_s(e) # end end end + # + # Adds the queue() method as an observer to the update and remove + # events of the expression pool. + # :update and :remove mean changes to expressions in the persistence + # that's why they are observed. + # def observe_expool get_expression_pool.add_observer(:update) do |event, fei, fe| ldebug { ":update for #{fei.to_debug_s}" } queue(event, fei, fe)