lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.14 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.15
- old
+ new
@@ -37,11 +37,11 @@
# Nicolas Modrzyk at openwfe.org
# John Mettraux at openwfe.org
#
require 'openwfe/utils'
-require 'openwfe/storage/yamlextras'
+require 'openwfe/storage/yamlcustom'
require 'openwfe/storage/yamlfilestorage'
require 'openwfe/expressions/flowexpression'
require 'openwfe/expressions/raw_xml'
#
@@ -57,16 +57,17 @@
# YAML expression storage. Expressions (atomic pieces of process instances)
# are stored in a hierarchy of YAML files.
#
class YamlFileExpressionStorage < YamlFileStorage
include OwfeServiceLocator
+ include ExpressionStorageBase
def initialize (service_name, application_context)
super(service_name, application_context, '/expool')
- observe_expool()
+ observe_expool
end
#
# Iterates on each expression that is of the given kind.
# Used for example by the expression pool when rescheduling.
@@ -144,28 +145,10 @@
[ md[1], md[2], md[3] ]
end
protected
- #
- # The actual binding of this storage as an observer of the
- # expression pool is done here.
- # It gives the opportunity to override this method with variants
- # (for example for delayed execution)
- #
- def observe_expool
-
- get_expression_pool.add_observer(:update) do |channel, fei, fe|
- #ldebug { ":update for #{fei.to_debug_s}" }
- self[fei] = fe
- end
- get_expression_pool.add_observer(:remove) do |channel, fei|
- #ldebug { ":remove for #{fei.to_debug_s}" }
- self.delete(fei)
- end
- end
-
def compute_file_path (fei)
return @basepath + "/engine_environment.yaml" \
if fei.workflow_instance_id == "0"
@@ -196,35 +179,22 @@
#end
#++
end
#
- # With this extension of YmalFileExpressionStorage, persistence occurs
- # in a separate thread, for a snappier response.
+ # This mixin gathers all the logic for a threaded expression storage,
+ # one that doesn't immediately stores workitems (removes overriding
+ # operations).
+ # Using this threaded storage brings a very important perf benefit.
#
- class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage
+ module ThreadedStorageMixin
- FREQ = "400" # milliseconds
+ THREADED_FREQ = "427" # milliseconds
#
# the frequency at which the event queue should be processed
#
- # TODO : make this configurable (param in the apcontext ?)
-
- def initialize (service_name, application_context)
-
- super
-
- @events = {}
- @op_count = 0
-
- start_processing_thread()
- #
- # which sets @thread_id
- end
-
- #
# Will take care of stopping the 'queue processing' thread.
#
def stop
get_scheduler.unschedule(@thread_id) if @thread_id
@@ -261,12 +231,15 @@
#
# starts the thread that does the actual persistence.
#
def start_processing_thread
- @thread_id = get_scheduler.schedule_every(FREQ) do
- process_queue()
+ @events = {}
+ @op_count = 0
+
+ @thread_id = get_scheduler.schedule_every THREADED_FREQ do
+ process_queue
end
end
#
# queues an event for later (well within a second) persistence
@@ -349,7 +322,24 @@
get_expression_pool.add_observer(:remove) do |event, fei|
ldebug { ":remove for #{fei.to_debug_s}" }
queue(event, fei)
end
end
+ end
+
+ #
+ # With this extension of YmalFileExpressionStorage, persistence occurs
+ # in a separate thread, for a snappier response.
+ #
+ class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage
+ include ThreadedStorageMixin
+
+ def initialize (service_name, application_context)
+
+ super
+
+ start_processing_thread()
+ #
+ # which sets @thread_id
+ end
end
end