lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.14 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.15

- old
+ new

@@ -37,11 +37,11 @@ # Nicolas Modrzyk at openwfe.org # John Mettraux at openwfe.org # require 'openwfe/utils' -require 'openwfe/storage/yamlextras' +require 'openwfe/storage/yamlcustom' require 'openwfe/storage/yamlfilestorage' require 'openwfe/expressions/flowexpression' require 'openwfe/expressions/raw_xml' # @@ -57,16 +57,17 @@ # YAML expression storage. Expressions (atomic pieces of process instances) # are stored in a hierarchy of YAML files. # class YamlFileExpressionStorage < YamlFileStorage include OwfeServiceLocator + include ExpressionStorageBase def initialize (service_name, application_context) super(service_name, application_context, '/expool') - observe_expool() + observe_expool end # # Iterates on each expression that is of the given kind. # Used for example by the expression pool when rescheduling. @@ -144,28 +145,10 @@ [ md[1], md[2], md[3] ] end protected - # - # The actual binding of this storage as an observer of the - # expression pool is done here. - # It gives the opportunity to override this method with variants - # (for example for delayed execution) - # - def observe_expool - - get_expression_pool.add_observer(:update) do |channel, fei, fe| - #ldebug { ":update for #{fei.to_debug_s}" } - self[fei] = fe - end - get_expression_pool.add_observer(:remove) do |channel, fei| - #ldebug { ":remove for #{fei.to_debug_s}" } - self.delete(fei) - end - end - def compute_file_path (fei) return @basepath + "/engine_environment.yaml" \ if fei.workflow_instance_id == "0" @@ -196,35 +179,22 @@ #end #++ end # - # With this extension of YmalFileExpressionStorage, persistence occurs - # in a separate thread, for a snappier response. + # This mixin gathers all the logic for a threaded expression storage, + # one that doesn't immediately stores workitems (removes overriding + # operations). + # Using this threaded storage brings a very important perf benefit. # - class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage + module ThreadedStorageMixin - FREQ = "400" # milliseconds + THREADED_FREQ = "427" # milliseconds # # the frequency at which the event queue should be processed # - # TODO : make this configurable (param in the apcontext ?) - - def initialize (service_name, application_context) - - super - - @events = {} - @op_count = 0 - - start_processing_thread() - # - # which sets @thread_id - end - - # # Will take care of stopping the 'queue processing' thread. # def stop get_scheduler.unschedule(@thread_id) if @thread_id @@ -261,12 +231,15 @@ # # starts the thread that does the actual persistence. # def start_processing_thread - @thread_id = get_scheduler.schedule_every(FREQ) do - process_queue() + @events = {} + @op_count = 0 + + @thread_id = get_scheduler.schedule_every THREADED_FREQ do + process_queue end end # # queues an event for later (well within a second) persistence @@ -349,7 +322,24 @@ get_expression_pool.add_observer(:remove) do |event, fei| ldebug { ":remove for #{fei.to_debug_s}" } queue(event, fei) end end + end + + # + # With this extension of YmalFileExpressionStorage, persistence occurs + # in a separate thread, for a snappier response. + # + class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage + include ThreadedStorageMixin + + def initialize (service_name, application_context) + + super + + start_processing_thread() + # + # which sets @thread_id + end end end