lib/openwfe/expool/errorjournal.rb in ruote-0.9.18 vs lib/openwfe/expool/errorjournal.rb in ruote-0.9.19
- old
+ new
@@ -1,34 +1,34 @@
#
#--
# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org
# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
+#
+# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
+# list of conditions and the following disclaimer.
+#
+# . Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
-#
+#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
@@ -45,432 +45,418 @@
require 'openwfe/rudefinitions'
module OpenWFE
+ #
+ # Encapsulating process error information.
+ #
+ # Instances of this class may be used to replay_at_error
+ #
+ class ProcessError
+
#
- # Encapsulating process error information.
+ # When did the error occur.
#
- # Instances of this class may be used to replay_at_error
+ attr_reader :date
+
#
- class ProcessError
+ # The FlowExpressionId instance uniquely pointing at the expression
+ # which 'failed'.
+ #
+ attr_reader :fei
- #
- # When did the error occur.
- #
- attr_reader :date
+ #
+ # Generally something like :apply or :reply
+ #
+ attr_reader :message
- #
- # The FlowExpressionId instance uniquely pointing at the expression
- # which 'failed'.
- #
- attr_reader :fei
+ #
+ # The workitem accompanying the message (apply(workitem) /
+ # reply (workitem)).
+ #
+ attr_reader :workitem
- #
- # Generally something like :apply or :reply
- #
- attr_reader :message
+ #
+ # The String stack trace of the error.
+ #
+ attr_reader :stacktrace
- #
- # The workitem accompanying the message (apply(workitem) /
- # reply (workitem)).
- #
- attr_reader :workitem
+ #
+ # The error class (String) of the top level error
+ #
+ attr_reader :error_class
- #
- # The String stack trace of the error.
- #
- attr_reader :stacktrace
+ def initialize (*args)
- #
- # The error class (String) of the top level error
- #
- attr_reader :error_class
+ @date = Time.new
+ @fei, @message, @workitem, @error_class, @stacktrace = args
+ end
- def initialize (*args)
+ #
+ # Returns the parent workflow instance id (process id) of this
+ # ProcessError instance.
+ #
+ def wfid
+ @fei.parent_wfid
+ end
- @date = Time.new
- @fei, @message, @workitem, @error_class, @stacktrace = args
- end
+ alias :parent_wfid :wfid
+ #
+ # Produces a human readable version of the information in the
+ # ProcessError instance.
+ #
+ def to_s
+ s = ""
+ s << "-- #{self.class.name} --\n"
+ s << " date : #{@date}\n"
+ s << " fei : #{@fei}\n"
+ s << " message : #{@message}\n"
+ s << " workitem : ...\n"
+ s << " error_class : #{@error_class}\n"
+ s << " stacktrace : #{@stacktrace[0, 80]}\n"
+ s
+ end
+
+ #
+ # Returns a hash
+ #
+ def hash
+ to_s.hash
#
- # Returns the parent workflow instance id (process id) of this
- # ProcessError instance.
+ # a bit costly but as it's only used by resume_process()...
+ end
+
+ #
+ # Returns true if the other instance is a ProcessError and is the
+ # same error as this one.
+ #
+ def == (other)
+ return false unless other.is_a?(ProcessError)
+ return to_s == other.to_s
#
- def wfid
- @fei.parent_wfid
- end
+ # a bit costly but as it's only used by resume_process()...
+ end
+ end
- alias :parent_wfid :wfid
+ #
+ # This is a base class for all error journal, don't instantiate,
+ # work rather with InMemoryErrorJournal (only for testing envs though),
+ # or YamlErrorJournal.
+ #
+ class ErrorJournal < Service
+ include OwfeServiceLocator
+ include FeiMixin
- #
- # Produces a human readable version of the information in the
- # ProcessError instance.
- #
- def to_s
- s = ""
- s << "-- #{self.class.name} --\n"
- s << " date : #{@date}\n"
- s << " fei : #{@fei}\n"
- s << " message : #{@message}\n"
- s << " workitem : ...\n"
- s << " error_class : #{@error_class}\n"
- s << " stacktrace : #{@stacktrace[0, 80]}\n"
- s
- end
+ def initialize (service_name, application_context)
+ super
+
+ get_expression_pool.add_observer :error do |event, *args|
#
- # Returns a hash
- #
- def hash
- to_s.hash
- #
- # a bit costly but as it's only used by resume_process()...
+ # logs each error occurring in the expression pool
+
+ begin
+
+ record_error(ProcessError.new(*args))
+
+ rescue Exception => e
+ lwarn { "(failed to record error : #{e})" }
+ lwarn { "*** process error : \n" + args.join("\n") }
end
+ end
+ get_expression_pool.add_observer :terminate do |event, *args|
#
- # Returns true if the other instance is a ProcessError and is the
- # same error as this one.
- #
- def == (other)
- return false unless other.is_a?(ProcessError)
- return to_s == other.to_s
- #
- # a bit costly but as it's only used by resume_process()...
- end
+ # removes error log when a process terminates
+
+ fei = args[0].fei
+
+ remove_error_log fei.wfid \
+ if fei.is_in_parent_process?
+ end
end
#
- # This is a base class for all error journal, don't instantiate,
- # work rather with InMemoryErrorJournal (only for testing envs though),
- # or YamlErrorJournal.
+ # Returns true if the given wfid (or fei) (process instance id)
+ # has had errors.
#
- class ErrorJournal < Service
- include OwfeServiceLocator
- include FeiMixin
+ def has_errors? (wfid)
- def initialize (service_name, application_context)
+ get_error_log(wfid).size > 0
+ end
- super
+ #
+ # Takes care of removing an error from the error journal and
+ # they replays its process at that point.
+ #
+ def replay_at_error (error)
- get_expression_pool.add_observer :error do |event, *args|
- #
- # logs each error occurring in the expression pool
-
- begin
+ remove_errors(
+ error.fei.parent_wfid,
+ error)
- record_error(ProcessError.new(*args))
+ get_workqueue.push(
+ get_expression_pool,
+ :do_apply_reply,
+ error.message,
+ error.fei,
+ error.workitem)
+ end
- rescue Exception => e
- lwarn { "*** process error : \n" + args.join("\n") }
- end
- end
+ #
+ # A utility method : given a list of errors, will make sure that for
+ # each flow expression only one expression (the most recent) will get
+ # listed.
+ # Returns a list of errors, from the oldest to the most recent.
+ #
+ # Could be useful when considering a process where multiple replay
+ # attempts failed.
+ #
+ def ErrorJournal.reduce_error_list (errors)
- get_expression_pool.add_observer :terminate do |event, *args|
- #
- # removes error log when a process terminates
+ h = {}
- fei = args[0].fei
+ errors.each do |e|
+ h[e.fei] = e
+ #
+ # last errors do override previous errors for the
+ # same fei
+ end
- remove_error_log fei.wfid \
- if fei.is_in_parent_process?
- end
- end
+ h.values.sort do |error_a, error_b|
+ error_a.date <=> error_b.date
+ end
+ end
+ end
- #
- # Returns true if the given wfid (or fei) (process instance id)
- # has had errors.
- #
- def has_errors? (wfid)
+ #
+ # Stores all the errors in a hash... For testing purposes only, like
+ # the InMemoryExpressionStorage.
+ #
+ class InMemoryErrorJournal < ErrorJournal
- get_error_log(wfid).size > 0
- end
+ def initialize (service_name, application_context)
- #--
- #
- # Commented out : has no real value
- #
- # Replays the given process instance (wfid or fei) at its last
- # recorded error.
- #
- # There is an optional 'offset' parameter. Its default value is '0'.
- # Which means that the replay will occur at the last error.
- #
- # ejournal.replay_at_last_error('20070630-hiwakuzara', 1)
- #
- # Will replay a given process instance at its 1 to last error.
- #
- #def replay_at_last_error (wfid, offset=0)
- # wfid = extract_wfid(wfid)
- # log = get_error_log(wfid)
- # index = (-1 - offset)
- # error = log[index]
- # raise "no error for process '#{wfid}' at offset #{offset}" \
- # unless error
- # replay_at_error error
- #end
- #++
+ super
- #--
- #
- # Moved to the engine itself.
- #
- # Replays at a specific error (fetched with read_error_log()).
- #
- #def replay_at_error (error)
- # get_expression_pool.queue_work(
- # error.message,
- # error.fei,
- # error.workitem)
- #end
- #++
+ @per_processes = {}
+ end
- #
- # A utility method : given a list of errors, will make sure that for
- # each flow expression only one expression (the most recent) will get
- # listed.
- # Returns a list of errors, from the oldest to the most recent.
- #
- # Could be useful when considering a process where multiple replay
- # attempts failed.
- #
- def ErrorJournal.reduce_error_list (errors)
+ #
+ # Returns a list (older first) of the errors for a process
+ # instance identified by its fei or wfid.
+ #
+ # Will return an empty list if there a no errors for the process
+ # instances.
+ #
+ def get_error_log (wfid)
- h = {}
+ wfid = extract_wfid wfid, true
+ @per_processes[wfid] or []
+ end
- errors.each do |e|
- h[e.fei] = e
- #
- # last errors do override previous errors for the
- # same fei
- end
+ #
+ # Removes the error log for a process instance.
+ #
+ def remove_error_log (wfid)
- h.values.sort do |error_a, error_b|
- error_a.date <=> error_b.date
- end
- end
+ wfid = extract_wfid wfid, true
+ @per_processes.delete(wfid)
end
#
- # Stores all the errors in a hash... For testing purposes only, like
- # the InMemoryExpressionStorage.
+ # Removes a list of errors from the error journal.
#
- class InMemoryErrorJournal < ErrorJournal
+ # The 'errors' parameter may be a single error (instead of an array).
+ #
+ def remove_errors (wfid, errors)
- def initialize (service_name, application_context)
+ errors = Array(errors)
- super
+ log = get_error_log wfid
- @per_processes = {}
- end
+ errors.each do |e|
+ log.delete e
+ end
+ end
- #
- # Returns a list (older first) of the errors for a process
- # instance identified by its fei or wfid.
- #
- # Will return an empty list if there a no errors for the process
- # instances.
- #
- def get_error_log (wfid)
+ #
+ # Reads all the error logs currently stored.
+ # Returns a hash wfid --> error list.
+ #
+ def get_error_logs
- wfid = extract_wfid wfid, true
- @per_processes[wfid] or []
- end
+ @per_processes
+ end
- #
- # Removes the error log for a process instance.
- #
- def remove_error_log (wfid)
+ protected
- wfid = extract_wfid wfid, true
- @per_processes.delete(wfid)
- end
+ def record_error (error)
- #
- # Removes a list of errors from the error journal.
- #
- # The 'errors' parameter may be a single error (instead of an array).
- #
- def remove_errors (wfid, errors)
+ (@per_processes[error.wfid] ||= []) << error
+ # not that unreadable after all...
+ end
+ end
- errors = Array(errors)
+ #
+ # A Journal that only keep track of error in process execution.
+ #
+ class YamlErrorJournal < ErrorJournal
- log = get_error_log wfid
+ attr_reader :workdir
- errors.each do |e|
- log.delete e
- end
- end
+ def initialize (service_name, application_context)
- #
- # Reads all the error logs currently stored.
- # Returns a hash wfid --> error list.
- #
- def get_error_logs
+ require 'openwfe/storage/yamlcustom'
+ # making sure this file has been required at this point
+ # this yamlcustom thing prevents the whole OpenWFE ecosystem
+ # to get serialized :)
- @per_processes
- end
+ super
- protected
+ @workdir = get_work_directory + "/ejournal"
+ #@workdir = File.expand_path @workdir
- def record_error (error)
-
- (@per_processes[error.wfid] ||= []) << error
- # not that unreadable after all...
- end
+ FileUtils.makedirs(@workdir) unless File.exist?(@workdir)
end
-
+
#
- # A Journal that only keep track of error in process execution.
+ # Returns a list (older first) of the errors for a process
+ # instance identified by its fei or wfid.
#
- class YamlErrorJournal < ErrorJournal
+ # Will return an empty list if there a no errors for the process
+ # instances.
+ #
+ def get_error_log (wfid)
- attr_reader :workdir
+ path = get_path wfid
- def initialize (service_name, application_context)
+ return [] unless File.exist?(path)
- require 'openwfe/storage/yamlcustom'
- # making sure this file has been required at this point
- # this yamlcustom thing prevents the whole OpenWFE ecosystem
- # to get serialized :)
+ read_error_log_from path
+ end
- super
+ #
+ # Copies the error log of a process instance to a give path (and
+ # filename).
+ #
+ # Could be useful when one has to perform replay operations and wants
+ # to keep a copy of the original error[s].
+ #
+ def copy_error_log_to (wfid, path)
- @workdir = get_work_directory + "/ejournal"
+ original_path = get_path wfid
+ FileUtils.copy_file original_path, path
+ end
- FileUtils.makedirs(@workdir) unless File.exist?(@workdir)
- end
+ #
+ # Reads an error log from a specific file (possibly as copied over
+ # via copy_error_log_to()).
+ #
+ def read_error_log_from (path)
- #
- # Returns a list (older first) of the errors for a process
- # instance identified by its fei or wfid.
- #
- # Will return an empty list if there a no errors for the process
- # instances.
- #
- def get_error_log (wfid)
+ raise "no error log file at #{path}" unless File.exist?(path)
- path = get_path wfid
+ File.open(path) do |f|
+ s = YAML.load_stream f
+ s.documents
+ end
+ end
- return [] unless File.exist?(path)
+ #
+ # Removes the error log of a specific process instance.
+ # Could be a good idea after a succesful replay operation.
+ #
+ # 'wfid' may be either a workflow instance id (String) either
+ # a FlowExpressionId instance.
+ #
+ def remove_error_log (wfid)
- read_error_log_from path
- end
+ path = get_path wfid
- #
- # Copies the error log of a process instance to a give path (and
- # filename).
- #
- # Could be useful when one has to perform replay operations and wants
- # to keep a copy of the original error[s].
- #
- def copy_error_log_to (wfid, path)
+ File.delete(path) if File.exist?(path)
+ end
- original_path = get_path wfid
- FileUtils.copy_file original_path, path
- end
+ #
+ # Removes a list of errors from this error journal.
+ #
+ def remove_errors (wfid, errors)
- #
- # Reads an error log from a specific file (possibly as copied over
- # via copy_error_log_to()).
- #
- def read_error_log_from (path)
+ errors = Array(errors)
- raise "no error log file at #{path}" unless File.exist?(path)
+ # load all errors
- File.open(path) do |f|
- s = YAML.load_stream f
- s.documents
- end
- end
+ log = get_error_log wfid
- #
- # Removes the error log of a specific process instance.
- # Could be a good idea after a succesful replay operation.
- #
- # 'wfid' may be either a workflow instance id (String) either
- # a FlowExpressionId instance.
- #
- def remove_error_log (wfid)
+ # remove the given errors
- path = get_path wfid
+ errors.each do |e|
+ log.delete e
+ end
- File.delete(path) if File.exist?(path)
- end
+ # rewrite error file
- #
- # Removes a list of errors from this error journal.
- #
- def remove_errors (wfid, errors)
+ path = get_path wfid
- errors = Array(errors)
+ if log.size > 0
- # load all errors
+ File.open(path, "w") do |f|
+ log.each do |e|
+ f.puts e.to_yaml
+ end
+ end
+ else
- log = get_error_log wfid
+ File.delete path
+ end
+ end
- # remove the given errors
+ #
+ # Reads all the error logs currently stored.
+ # Returns a hash wfid --> error list.
+ #
+ def get_error_logs
- errors.each do |e|
- log.delete e
- end
+ result = {}
- # rewrite error file
+ Find.find(@workdir) do |path|
- path = get_path wfid
+ next unless path.match(/\.ejournal$/)
- if log.size > 0
+ log = read_error_log_from path
+ result[log.first.fei.wfid] = log
+ end
- File.open(path, "w") do |f|
- log.each do |e|
- f.puts e.to_yaml
- end
- end
- else
+ result
+ end
- File.delete path
- end
- end
+ protected
- #
- # Reads all the error logs currently stored.
- # Returns a hash wfid --> error list.
- #
- def get_error_logs
+ #
+ # logs the error as a yaml string in an error log file
+ # (there is one error log file per workflow instance).
+ #
+ def record_error (error)
- result = {}
+ path = get_path error.fei
- Find.find(@workdir) do |path|
- next unless path.endswith(".ejournal")
- wfid = path[0..-9]
- log = read_error_log wfid
- result[wfid] = log
- end
+ dirpath = File.dirname path
- result
+ FileUtils.mkdir_p(dirpath) unless File.exist?(dirpath)
+
+ File.open path, "a+" do |f|
+ f.puts error.to_yaml
end
+ end
- protected
+ #
+ # Returns the path to the error log file of a specific process
+ # instance.
+ #
+ def get_path (fei_or_wfid)
- #
- # logs the error as a yaml string in an error log file
- # (there is one error log file per workflow instance).
- #
- def record_error (error)
-
- path = get_path error.fei
-
- File.open(path, "a+") do |f|
- f.puts error.to_yaml
- end
- end
-
- #
- # Returns the path to the error log file of a specific process
- # instance.
- #
- def get_path (fei_or_wfid)
-
- @workdir + "/" + extract_wfid(fei_or_wfid, true) + ".ejournal"
- end
- end
+ @workdir + "/" + extract_wfid(fei_or_wfid, true) + ".ejournal"
+ end
+ end
end