lib/openwfe/expool/journal.rb in openwferu-0.9.9 vs lib/openwfe/expool/journal.rb in openwferu-0.9.10

- old
+ new

@@ -41,10 +41,11 @@ require 'monitor' require 'fileutils' require 'openwfe/service' +require 'openwfe/omixins' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/otime' require 'openwfe/storage/yamlextras' require 'openwfe/expool/journal_replay' @@ -56,10 +57,11 @@ # Keeping a replayable track of the events in an OpenWFEru engine # class Journal < Service include MonitorMixin, OwfeServiceLocator include JournalReplay + include FeiMixin attr_reader :workdir, :donedir FREQ = "1m" # @@ -75,11 +77,11 @@ @donedir = @workdir + "/done" FileUtils.makedirs(@donedir) unless File.exist?(@donedir) get_expression_pool.add_observer(:all) do |event, *args| - #ldebug { ":#{event} for #{args[0].to_debug_s}" } + #ldebug { ":#{event} for #{args[0].class.name}" } queue_event(event, *args) end @thread_id = get_scheduler.schedule_every(FREQ) do flush_buckets() @@ -107,11 +109,15 @@ return if event == :stop return if event == :launch return if event == :reschedule - wfid = args[0].parent_wfid - #puts "__#{wfid} : #{event}" + #wfid = args[0].parent_wfid + wfid = extract_fei(args[0]).parent_wfid + # + # maybe args[0] could be a FlowExpression instead + # of a FlowExpressionId instance + #puts "___#{event}__wfid : #{wfid}" e = serialize_event(event, *args) bucket = get_bucket(wfid) bucket << e