lib/openwfe/expool/journal_replay.rb in openwferu-0.9.7 vs lib/openwfe/expool/journal_replay.rb in openwferu-0.9.8

- old
+ new

@@ -43,79 +43,221 @@ module OpenWFE # - # Keeping a replayable track of the events in an OpenWFEru engine + # The code decicated to replay and reconstitute journal. # module JournalReplay - def replay (file_path) + # + # Replays a given journal file. + # + # The offset can be determined by running the analyze() method. + # + # If 'trigger_action' is set to true, the apply or reply or cancel + # action found at the given offset will be triggered. + # + def replay (file_path, offset, trigger_action=false) - events = File.open(file_path) do |f| - s = YAML.load_stream f - s.documents + states = decompose(file_path) + + state = nil + + states.each do |s| + state = s if s.offset == offset end - # - # what to do with Sleep and When ? + raise "cannot replay offset #{offset}" unless state - events = events[0..-20] + #puts "expstorage size 0 = #{get_expression_storage.size}" - participants = {} + state.static.each do |update| + flow_expression = update[3] + flow_expression.application_context = @application_context + get_expression_pool.update(flow_expression) + end - seen = {} - static = [] - events.reverse.each do |e| + get_expression_pool.reschedule - etype = e[0] - fei = e[2] + #puts "expstorage size 1 = #{get_expression_storage.size}" - next if etype == :apply - next if etype == :reply - next if etype == :reply_to_parent - next if seen[fei] + return unless trigger_action - seen[fei] = true + state.dynamic.each do |ply| - next if etype == :remove + message = ply[0] + fei = ply[2] + wi = ply[3] - static << e + if wi + # + # apply, reply, reply_to_parent + # + get_expression_pool.send message, fei, wi + else + # + # cancel + # + get_expression_pool.send message, fei + end + end + end - participants[fei] = true \ - if e[3].is_a? OpenWFE::ParticipantExpression + # + # Outputs a report of the each of the main events that the journal + # traced. + # + # The output goes to the stdout. + # + # The output can be used to determine an offset number for a replay() + # of the journal. + # + def analyze (file_path) + + states = decompose(file_path) + + states.each do |state| + + next if state.dynamic.length < 1 + + puts + puts state.to_s + puts end + end - seen = {} - dynamic = [] - events.reverse.each do |e| - etype = e[0] - fei = e[2] - next if etype == :update - next if etype == :remove - #next if etype == :reply_to_parent - next if seen[fei] - next unless participants[fei] - seen[fei] = true - dynamic << e + # + # Decomposes the given file_path into a list of states + # + def decompose (file_path) + + do_decompose(load_events(file_path), [], nil, 0) + end + + protected + + def do_decompose (events, result, previous_state, offset) + + current_state = extract_state(events, offset) + + return result unless current_state + + result << current_state + + do_decompose(events, result, current_state, offset + 1) end - puts - puts "static :" - static.each do |e| - puts " - #{e[0]} #{e[2].to_short_s}" + def load_events (file_path) + + File.open(file_path) do |f| + s = YAML.load_stream f + s.documents + end end - puts - puts "participants :" - participants.each do |fei, v| - puts " - #{fei.to_short_s}" + def extract_state (file, offset) + + events = if file.is_a?(String) + load_events(file) + else + file + end + + # + # what to do with Sleep and When ? + + off = -1 + off = off - offset if offset + + return nil if (events.size + off < 0) + + events = events[0..off] + + date = events[-1][1] + + participants = {} + + seen = {} + static = [] + events.reverse.each do |e| + + etype = e[0] + fei = e[2] + + next if etype == :apply + next if etype == :reply + next if etype == :reply_to_parent + next if seen[fei] + + seen[fei] = true + + next if etype == :remove + + static << e + + participants[fei] = true \ + if e[3].is_a? OpenWFE::ParticipantExpression + end + + seen = {} + dynamic = [] + events.reverse.each do |e| + etype = e[0] + fei = e[2] + next if etype == :update + next if etype == :remove + #next if etype == :reply_to_parent + next if seen[fei] + next unless participants[fei] + seen[fei] = true + dynamic << e + end + + ExpoolState.new(offset, date, static, dynamic, participants) end - puts - puts "dynamic :" - dynamic.each do |e| - puts " - #{e[0]} #{e[2].to_short_s}" + class ExpoolState + + attr_accessor \ + :offset, + :date, + :static, + :dynamic, + :participants + + def initialize (offset, date, static, dynamic, participants) + + @offset = offset + @date = date + @static = static + @dynamic = dynamic + @participants = participants + end + + def to_s + + s = " ===== offset : #{@offset} #{@date} =====\n" + + s << "\n" + s << "static :\n" + @static.each do |e| + s << " - #{e[0]} #{e[2].to_short_s}\n" + end + + s << "\n" + s << "dynamic :\n" + @dynamic.each do |e| + s << " - #{e[0]} #{e[2].to_short_s}\n" + end + + #s << "\n" + #s << "participants :\n" + #@participants.each do |fei, v| + # s << " - #{fei.to_short_s}\n" + #end + + s + end end - end end end