lib/openwfe/expool/journal_replay.rb in ruote-0.9.18 vs lib/openwfe/expool/journal_replay.rb in ruote-0.9.19
- old
+ new
@@ -1,34 +1,34 @@
#
#--
# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org
# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
+#
+# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
+# list of conditions and the following disclaimer.
+#
+# . Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
-#
+#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
# $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $
#
@@ -41,281 +41,281 @@
require 'openwfe/flowexpressionid'
module OpenWFE
-
+
+ #
+ # The code decicated to replay and reconstitute journal.
+ #
+ module JournalReplay
+
#
- # The code decicated to replay and reconstitute journal.
+ # Replays a given journal file.
#
- module JournalReplay
+ # The offset can be determined by running the analyze() method.
+ #
+ # If 'trigger_action' is set to true, the apply or reply or cancel
+ # action found at the given offset will be triggered.
+ #
+ def replay (file_path, offset, trigger_action=false)
- #
- # Replays a given journal file.
- #
- # The offset can be determined by running the analyze() method.
- #
- # If 'trigger_action' is set to true, the apply or reply or cancel
- # action found at the given offset will be triggered.
- #
- def replay (file_path, offset, trigger_action=false)
+ states = decompose(file_path)
- states = decompose(file_path)
+ state = nil
- state = nil
+ states.each do |s|
+ state = s if s.offset == offset
+ end
- states.each do |s|
- state = s if s.offset == offset
- end
+ raise "cannot replay offset #{offset}" unless state
- raise "cannot replay offset #{offset}" unless state
+ #puts "expstorage size 0 = #{get_expression_storage.size}"
- #puts "expstorage size 0 = #{get_expression_storage.size}"
+ state.static.each do |update|
+ flow_expression = update[3]
+ flow_expression.application_context = @application_context
+ get_expression_pool.update(flow_expression)
+ end
- state.static.each do |update|
- flow_expression = update[3]
- flow_expression.application_context = @application_context
- get_expression_pool.update(flow_expression)
- end
+ get_expression_pool.reschedule
- get_expression_pool.reschedule
+ #puts "expstorage size 1 = #{get_expression_storage.size}"
- #puts "expstorage size 1 = #{get_expression_storage.size}"
+ return unless trigger_action
- return unless trigger_action
+ #puts "sds : #{state.dynamic.size}"
- #puts "sds : #{state.dynamic.size}"
+ state.dynamic.each do |ply|
- state.dynamic.each do |ply|
+ message = ply[0]
+ #fei = extract_fei(ply[2])
+ fei_or_fexp = ply[2]
+ wi = ply[3]
- message = ply[0]
- #fei = extract_fei(ply[2])
- fei_or_fexp = ply[2]
- wi = ply[3]
+ fei_or_fexp.application_context = @application_context \
+ if fei_or_fexp.is_a?(FlowExpression)
- fei_or_fexp.application_context = @application_context \
- if fei_or_fexp.is_a?(FlowExpression)
-
- if wi
- #
- # apply, reply, reply_to_parent
- #
- #get_expression_pool.send message, fei, wi
- get_expression_pool.send message, fei_or_fexp, wi
- else
- #
- # cancel
- #
- get_expression_pool.send message, fei
- end
- end
+ if wi
+ #
+ # apply, reply, reply_to_parent
+ #
+ #get_expression_pool.send message, fei, wi
+ get_expression_pool.send message, fei_or_fexp, wi
+ else
+ #
+ # cancel
+ #
+ get_expression_pool.send message, fei
end
+ end
+ end
- #
- # Outputs a report of the each of the main events that the journal
- # traced.
- #
- # The output goes to the stdout.
- #
- # The output can be used to determine an offset number for a replay()
- # of the journal.
- #
- def analyze (file_path)
+ #
+ # Outputs a report of the each of the main events that the journal
+ # traced.
+ #
+ # The output goes to the stdout.
+ #
+ # The output can be used to determine an offset number for a replay()
+ # of the journal.
+ #
+ def analyze (file_path)
- states = decompose(file_path)
+ states = decompose(file_path)
- states.each do |state|
+ states.each do |state|
- next if state.dynamic.length < 1
+ next if state.dynamic.length < 1
- puts
- puts state.to_s
- puts
- end
- end
+ puts
+ puts state.to_s
+ puts
+ end
+ end
- #
- # Decomposes the given file_path into a list of states
- #
- def decompose (file_path)
+ #
+ # Decomposes the given file_path into a list of states
+ #
+ def decompose (file_path)
- do_decompose(load_events(file_path), [], nil, 0)
- end
+ do_decompose(load_events(file_path), [], nil, 0)
+ end
- #
- # Loads a journal file and return the content as a list of
- # events. This method is made available for unit tests, as
- # a public method it has not much interest.
- #
- def load_events (file_path)
+ #
+ # Loads a journal file and return the content as a list of
+ # events. This method is made available for unit tests, as
+ # a public method it has not much interest.
+ #
+ def load_events (file_path)
- File.open(file_path) do |f|
- s = YAML.load_stream f
- s.documents
- end
- end
+ File.open(file_path) do |f|
+ s = YAML.load_stream f
+ s.documents
+ end
+ end
- #
- # Takes an error event (as stored in the journal) and replays it
- # (usually you'd have to fix the engine conf before replaying
- # the error trigger)
- #
- # (Make sure to fix the cause of the error before triggering this
- # method)
- #
- def replay_at_error (error_source_event)
+ #
+ # Takes an error event (as stored in the journal) and replays it
+ # (usually you'd have to fix the engine conf before replaying
+ # the error trigger)
+ #
+ # (Make sure to fix the cause of the error before triggering this
+ # method)
+ #
+ def replay_at_error (error_source_event)
- get_workqueue.push(
- get_expression_pool,
- :do_apply_reply,
- error_source_event[3], # message (:apply for example)
- error_source_event[2], # fei or exp
- error_source_event[4]) # workitem
+ get_workqueue.push(
+ get_expression_pool,
+ :do_apply_reply,
+ error_source_event[3], # message (:apply for example)
+ error_source_event[2], # fei or exp
+ error_source_event[4]) # workitem
- # 0 is :error and 1 is the date and time of the error
+ # 0 is :error and 1 is the date and time of the error
- linfo do
- fei = extract_fei(error_source_event[2])
- "replay_at_error() #{error_source_event[3]} #{fei}"
- end
- end
+ linfo do
+ fei = extract_fei(error_source_event[2])
+ "replay_at_error() #{error_source_event[3]} #{fei}"
+ end
+ end
- #
- # Detects the last error that ocurred for a workflow instance
- # and replays at that point (see replay_at_error).
- #
- # (Make sure to fix the cause of the error before triggering this
- # method)
- #
- def replay_at_last_error (wfid)
+ #
+ # Detects the last error that ocurred for a workflow instance
+ # and replays at that point (see replay_at_error).
+ #
+ # (Make sure to fix the cause of the error before triggering this
+ # method)
+ #
+ def replay_at_last_error (wfid)
- events = load_events(get_path(wfid))
+ events = load_events(get_path(wfid))
- error_event = events.reverse.find do |evt|
- evt[0] == :error
- end
+ error_event = events.reverse.find do |evt|
+ evt[0] == :error
+ end
- replay_at_error(error_event)
- end
+ replay_at_error(error_event)
+ end
- protected
+ protected
- def do_decompose (events, result, previous_state, offset)
+ def do_decompose (events, result, previous_state, offset)
- current_state = extract_state(events, offset)
+ current_state = extract_state(events, offset)
- return result unless current_state
+ return result unless current_state
- result << current_state
+ result << current_state
- do_decompose(events, result, current_state, offset + 1)
- end
+ do_decompose(events, result, current_state, offset + 1)
+ end
- def extract_state (file, offset)
+ def extract_state (file, offset)
- events = if file.is_a?(String)
- load_events(file)
- else
- file
- end
+ events = if file.is_a?(String)
+ load_events(file)
+ else
+ file
+ end
- #
- # what to do with Sleep and When ?
+ #
+ # what to do with Sleep and When ?
- off = -1
- off = off - offset if offset
+ off = -1
+ off = off - offset if offset
- return nil if (events.size + off < 0)
+ return nil if (events.size + off < 0)
- events = events[0..off]
+ events = events[0..off]
- date = events[-1][1]
+ date = events[-1][1]
- participants = {}
+ participants = {}
- seen = {}
- static = []
- events.reverse.each do |e|
+ seen = {}
+ static = []
+ events.reverse.each do |e|
- etype = e[0]
- fei = e[2]
+ etype = e[0]
+ fei = e[2]
- next if etype == :apply
- next if etype == :reply
- next if etype == :reply_to_parent
- next if etype == :error
- next if seen[fei]
+ next if etype == :apply
+ next if etype == :reply
+ next if etype == :reply_to_parent
+ next if etype == :error
+ next if seen[fei]
- seen[fei] = true
+ seen[fei] = true
- next if etype == :remove
+ next if etype == :remove
- static << e
+ static << e
- participants[fei] = true \
- if e[3].is_a? OpenWFE::ParticipantExpression
- end
+ participants[fei] = true \
+ if e[3].is_a? OpenWFE::ParticipantExpression
+ end
- seen = {}
- dynamic = []
- events.reverse.each do |e|
- etype = e[0]
- fei = extract_fei e[2]
- next if etype == :update
- next if etype == :remove
- next if etype == :error
- #next if etype == :reply_to_parent
- next if seen[fei]
- next unless participants[fei]
- seen[fei] = true
- dynamic << e
- end
+ seen = {}
+ dynamic = []
+ events.reverse.each do |e|
+ etype = e[0]
+ fei = extract_fei e[2]
+ next if etype == :update
+ next if etype == :remove
+ next if etype == :error
+ #next if etype == :reply_to_parent
+ next if seen[fei]
+ next unless participants[fei]
+ seen[fei] = true
+ dynamic << e
+ end
- ExpoolState.new(offset, date, static, dynamic, participants)
- end
+ ExpoolState.new(offset, date, static, dynamic, participants)
+ end
- class ExpoolState
- include FeiMixin
+ class ExpoolState
+ include FeiMixin
- attr_accessor \
- :offset,
- :date,
- :static,
- :dynamic,
- :participants
+ attr_accessor \
+ :offset,
+ :date,
+ :static,
+ :dynamic,
+ :participants
- def initialize (offset, date, static, dynamic, participants)
+ def initialize (offset, date, static, dynamic, participants)
- @offset = offset
- @date = date
- @static = static
- @dynamic = dynamic
- @participants = participants
- end
+ @offset = offset
+ @date = date
+ @static = static
+ @dynamic = dynamic
+ @participants = participants
+ end
- def to_s
+ def to_s
- s = " ===== offset : #{@offset} #{@date} =====\n"
+ s = " ===== offset : #{@offset} #{@date} =====\n"
- s << "\n"
- s << "static :\n"
- @static.each do |e|
- s << " - #{e[0]} #{extract_fei(e[2]).to_short_s}\n"
- end
+ s << "\n"
+ s << "static :\n"
+ @static.each do |e|
+ s << " - #{e[0]} #{extract_fei(e[2]).to_short_s}\n"
+ end
- s << "\n"
- s << "dynamic :\n"
- @dynamic.each do |e|
- s << " - #{e[0]} #{extract_fei(e[2]).to_short_s}\n"
- end
+ s << "\n"
+ s << "dynamic :\n"
+ @dynamic.each do |e|
+ s << " - #{e[0]} #{extract_fei(e[2]).to_short_s}\n"
+ end
- #s << "\n"
- #s << "participants :\n"
- #@participants.each do |fei, v|
- # s << " - #{fei.to_short_s}\n"
- #end
+ #s << "\n"
+ #s << "participants :\n"
+ #@participants.each do |fei, v|
+ # s << " - #{fei.to_short_s}\n"
+ #end
- s
- end
- end
- end
+ s
+ end
+ end
+ end
end