lib/openwfe/expool/journal_replay.rb in openwferu-0.9.7 vs lib/openwfe/expool/journal_replay.rb in openwferu-0.9.8
- old
+ new
@@ -43,79 +43,221 @@
module OpenWFE
#
- # Keeping a replayable track of the events in an OpenWFEru engine
+ # The code decicated to replay and reconstitute journal.
#
module JournalReplay
- def replay (file_path)
+ #
+ # Replays a given journal file.
+ #
+ # The offset can be determined by running the analyze() method.
+ #
+ # If 'trigger_action' is set to true, the apply or reply or cancel
+ # action found at the given offset will be triggered.
+ #
+ def replay (file_path, offset, trigger_action=false)
- events = File.open(file_path) do |f|
- s = YAML.load_stream f
- s.documents
+ states = decompose(file_path)
+
+ state = nil
+
+ states.each do |s|
+ state = s if s.offset == offset
end
- #
- # what to do with Sleep and When ?
+ raise "cannot replay offset #{offset}" unless state
- events = events[0..-20]
+ #puts "expstorage size 0 = #{get_expression_storage.size}"
- participants = {}
+ state.static.each do |update|
+ flow_expression = update[3]
+ flow_expression.application_context = @application_context
+ get_expression_pool.update(flow_expression)
+ end
- seen = {}
- static = []
- events.reverse.each do |e|
+ get_expression_pool.reschedule
- etype = e[0]
- fei = e[2]
+ #puts "expstorage size 1 = #{get_expression_storage.size}"
- next if etype == :apply
- next if etype == :reply
- next if etype == :reply_to_parent
- next if seen[fei]
+ return unless trigger_action
- seen[fei] = true
+ state.dynamic.each do |ply|
- next if etype == :remove
+ message = ply[0]
+ fei = ply[2]
+ wi = ply[3]
- static << e
+ if wi
+ #
+ # apply, reply, reply_to_parent
+ #
+ get_expression_pool.send message, fei, wi
+ else
+ #
+ # cancel
+ #
+ get_expression_pool.send message, fei
+ end
+ end
+ end
- participants[fei] = true \
- if e[3].is_a? OpenWFE::ParticipantExpression
+ #
+ # Outputs a report of the each of the main events that the journal
+ # traced.
+ #
+ # The output goes to the stdout.
+ #
+ # The output can be used to determine an offset number for a replay()
+ # of the journal.
+ #
+ def analyze (file_path)
+
+ states = decompose(file_path)
+
+ states.each do |state|
+
+ next if state.dynamic.length < 1
+
+ puts
+ puts state.to_s
+ puts
end
+ end
- seen = {}
- dynamic = []
- events.reverse.each do |e|
- etype = e[0]
- fei = e[2]
- next if etype == :update
- next if etype == :remove
- #next if etype == :reply_to_parent
- next if seen[fei]
- next unless participants[fei]
- seen[fei] = true
- dynamic << e
+ #
+ # Decomposes the given file_path into a list of states
+ #
+ def decompose (file_path)
+
+ do_decompose(load_events(file_path), [], nil, 0)
+ end
+
+ protected
+
+ def do_decompose (events, result, previous_state, offset)
+
+ current_state = extract_state(events, offset)
+
+ return result unless current_state
+
+ result << current_state
+
+ do_decompose(events, result, current_state, offset + 1)
end
- puts
- puts "static :"
- static.each do |e|
- puts " - #{e[0]} #{e[2].to_short_s}"
+ def load_events (file_path)
+
+ File.open(file_path) do |f|
+ s = YAML.load_stream f
+ s.documents
+ end
end
- puts
- puts "participants :"
- participants.each do |fei, v|
- puts " - #{fei.to_short_s}"
+ def extract_state (file, offset)
+
+ events = if file.is_a?(String)
+ load_events(file)
+ else
+ file
+ end
+
+ #
+ # what to do with Sleep and When ?
+
+ off = -1
+ off = off - offset if offset
+
+ return nil if (events.size + off < 0)
+
+ events = events[0..off]
+
+ date = events[-1][1]
+
+ participants = {}
+
+ seen = {}
+ static = []
+ events.reverse.each do |e|
+
+ etype = e[0]
+ fei = e[2]
+
+ next if etype == :apply
+ next if etype == :reply
+ next if etype == :reply_to_parent
+ next if seen[fei]
+
+ seen[fei] = true
+
+ next if etype == :remove
+
+ static << e
+
+ participants[fei] = true \
+ if e[3].is_a? OpenWFE::ParticipantExpression
+ end
+
+ seen = {}
+ dynamic = []
+ events.reverse.each do |e|
+ etype = e[0]
+ fei = e[2]
+ next if etype == :update
+ next if etype == :remove
+ #next if etype == :reply_to_parent
+ next if seen[fei]
+ next unless participants[fei]
+ seen[fei] = true
+ dynamic << e
+ end
+
+ ExpoolState.new(offset, date, static, dynamic, participants)
end
- puts
- puts "dynamic :"
- dynamic.each do |e|
- puts " - #{e[0]} #{e[2].to_short_s}"
+ class ExpoolState
+
+ attr_accessor \
+ :offset,
+ :date,
+ :static,
+ :dynamic,
+ :participants
+
+ def initialize (offset, date, static, dynamic, participants)
+
+ @offset = offset
+ @date = date
+ @static = static
+ @dynamic = dynamic
+ @participants = participants
+ end
+
+ def to_s
+
+ s = " ===== offset : #{@offset} #{@date} =====\n"
+
+ s << "\n"
+ s << "static :\n"
+ @static.each do |e|
+ s << " - #{e[0]} #{e[2].to_short_s}\n"
+ end
+
+ s << "\n"
+ s << "dynamic :\n"
+ @dynamic.each do |e|
+ s << " - #{e[0]} #{e[2].to_short_s}\n"
+ end
+
+ #s << "\n"
+ #s << "participants :\n"
+ #@participants.each do |fei, v|
+ # s << " - #{fei.to_short_s}\n"
+ #end
+
+ s
+ end
end
- end
end
end