lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.11 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.12
- old
+ new
@@ -62,16 +62,10 @@
class YamlFileExpressionStorage < YamlFileStorage
include OwfeServiceLocator
def initialize (service_name, application_context)
- #path = if (@application_context)
- # @application_context[:work_directory]
- #else
- # DEFAULT_WORK_DIRECTORY
- #end
- #path = path + '/expool'
path = OpenWFE::get_work_directory + '/expool'
super(service_name, application_context, path)
observe_expool()
@@ -84,12 +78,10 @@
def each_of_kind (kind, &block)
return unless block
exp_names = get_expression_map.get_expression_names(kind)
- #require 'pp'
- #pp exp_names
each_object_path do |path|
#ldebug { "each_of_kind() path is #{path}" }
@@ -100,13 +92,28 @@
block.call expression
end
end
- def each (&block)
- each_object do |flow_expression|
- block.call(flow_expression.fei, flow_expression)
+ #
+ # "each flow expression" : this method awaits a block then, for
+ # each flow_expression in this storage, calls that block.
+ #
+ # If wfid_prefix is set, only expressions whose wfid (workflow instance
+ # id (process instance id)) will be taken into account.
+ #
+ def each (wfid_prefix=nil, &block)
+
+ each_object_path do |path|
+
+ a = self.class.split_file_path path
+ next unless a
+ wfid = a[0]
+ next if wfid_prefix and ( ! wfid.match "^#{wfid_prefix}")
+ flow_expression = load_object path
+
+ block.call flow_expression.fei, flow_expression
end
end
alias :real_each :each
@@ -117,13 +124,26 @@
each_object_path do |path|
s << path
s << "\n"
end
s << "==== . ====\n"
- return s
+ s
end
+ #
+ # Returns nil (if the path doesn't match an stored expression path)
+ # or an array [ workflow_instance_id, expression_id, expression_name ].
+ #
+ # This is a class method (not an instance one).
+ #
+ def self.split_file_path (path)
+
+ md = path.match %r{.*/(.*)__([\d.]*)_(.*).yaml}
+ return nil unless md
+ [ md[1], md[2], md[3] ]
+ end
+
protected
#
# The actual binding of this storage as an observer of the
# expression pool is done here.
@@ -147,29 +167,32 @@
return @basepath + "/engine_environment.yaml" \
if fei.workflow_instance_id == "0"
wfid = fei.parent_workflow_instance_id
- #a_wfid = OpenWFE::split_wfid(wfid)
a_wfid = get_wfid_generator.split_wfid(wfid)
@basepath +
a_wfid[-2] + "/" +
a_wfid[-1] + "/" +
fei.workflow_instance_id + "__" +
fei.expression_id + "_" +
fei.expression_name + ".yaml"
end
+ #
+ # Returns true if the path points to a file containing an
+ # expression whose name is in exp_names.
+ #
def matches (path, exp_names)
exp_names.each do |exp_name|
return true \
if OpenWFE::ends_with(path, "_#{exp_name}.yaml")
end
- return false
+ false
end
end
#
# With this extension of YmalFileExpressionStorage, persistence occurs
@@ -179,10 +202,13 @@
FREQ = "400" # milliseconds
#
# the frequency at which the event queue should be processed
+ #
+ # TODO : make this configurable (param in the apcontext ?)
+
def initialize (service_name, application_context)
super
@events = {}
@@ -219,25 +245,31 @@
#
# calls process_queue() before the call the super class each()
# method
#
- def each (&block)
+ def each (wfid_prefix=nil, &block)
process_queue()
super
end
protected
+ #
+ # starts the thread that does the actual persistence.
+ #
def start_processing_thread
@thread_id = get_scheduler.schedule_every(FREQ) do
process_queue()
end
end
+ #
+ # queues an event for later (well within a second) persistence
+ #
def queue (event, fei, fe=nil)
synchronize do
old_size = @events.size
@op_count += 1
@@ -249,11 +281,14 @@
"size #{old_size} -> #{@events.size}"
end
end
end
- def process_queue ()
+ #
+ # the actual "do persist" order
+ #
+ def process_queue
return unless @events.size > 0
#
# trying to exit as quickly as possible
@@ -294,9 +329,15 @@
# OpenWFE::exception_to_s(e)
# end
end
end
+ #
+ # Adds the queue() method as an observer to the update and remove
+ # events of the expression pool.
+ # :update and :remove mean changes to expressions in the persistence
+ # that's why they are observed.
+ #
def observe_expool
get_expression_pool.add_observer(:update) do |event, fei, fe|
ldebug { ":update for #{fei.to_debug_s}" }
queue(event, fei, fe)