lib/openwfe/expool/errorjournal.rb in ruote-0.9.18 vs lib/openwfe/expool/errorjournal.rb in ruote-0.9.19

- old
+ new

@@ -1,34 +1,34 @@ # #-- # Copyright (c) 2007-2008, John Mettraux, OpenWFE.org # All rights reserved. -# -# Redistribution and use in source and binary forms, with or without +# +# Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: -# +# # . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation +# list of conditions and the following disclaimer. +# +# . Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# +# # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # @@ -45,432 +45,418 @@ require 'openwfe/rudefinitions' module OpenWFE + # + # Encapsulating process error information. + # + # Instances of this class may be used to replay_at_error + # + class ProcessError + # - # Encapsulating process error information. + # When did the error occur. # - # Instances of this class may be used to replay_at_error + attr_reader :date + # - class ProcessError + # The FlowExpressionId instance uniquely pointing at the expression + # which 'failed'. + # + attr_reader :fei - # - # When did the error occur. - # - attr_reader :date + # + # Generally something like :apply or :reply + # + attr_reader :message - # - # The FlowExpressionId instance uniquely pointing at the expression - # which 'failed'. - # - attr_reader :fei + # + # The workitem accompanying the message (apply(workitem) / + # reply (workitem)). + # + attr_reader :workitem - # - # Generally something like :apply or :reply - # - attr_reader :message + # + # The String stack trace of the error. + # + attr_reader :stacktrace - # - # The workitem accompanying the message (apply(workitem) / - # reply (workitem)). - # - attr_reader :workitem + # + # The error class (String) of the top level error + # + attr_reader :error_class - # - # The String stack trace of the error. - # - attr_reader :stacktrace + def initialize (*args) - # - # The error class (String) of the top level error - # - attr_reader :error_class + @date = Time.new + @fei, @message, @workitem, @error_class, @stacktrace = args + end - def initialize (*args) + # + # Returns the parent workflow instance id (process id) of this + # ProcessError instance. + # + def wfid + @fei.parent_wfid + end - @date = Time.new - @fei, @message, @workitem, @error_class, @stacktrace = args - end + alias :parent_wfid :wfid + # + # Produces a human readable version of the information in the + # ProcessError instance. + # + def to_s + s = "" + s << "-- #{self.class.name} --\n" + s << " date : #{@date}\n" + s << " fei : #{@fei}\n" + s << " message : #{@message}\n" + s << " workitem : ...\n" + s << " error_class : #{@error_class}\n" + s << " stacktrace : #{@stacktrace[0, 80]}\n" + s + end + + # + # Returns a hash + # + def hash + to_s.hash # - # Returns the parent workflow instance id (process id) of this - # ProcessError instance. + # a bit costly but as it's only used by resume_process()... + end + + # + # Returns true if the other instance is a ProcessError and is the + # same error as this one. + # + def == (other) + return false unless other.is_a?(ProcessError) + return to_s == other.to_s # - def wfid - @fei.parent_wfid - end + # a bit costly but as it's only used by resume_process()... + end + end - alias :parent_wfid :wfid + # + # This is a base class for all error journal, don't instantiate, + # work rather with InMemoryErrorJournal (only for testing envs though), + # or YamlErrorJournal. + # + class ErrorJournal < Service + include OwfeServiceLocator + include FeiMixin - # - # Produces a human readable version of the information in the - # ProcessError instance. - # - def to_s - s = "" - s << "-- #{self.class.name} --\n" - s << " date : #{@date}\n" - s << " fei : #{@fei}\n" - s << " message : #{@message}\n" - s << " workitem : ...\n" - s << " error_class : #{@error_class}\n" - s << " stacktrace : #{@stacktrace[0, 80]}\n" - s - end + def initialize (service_name, application_context) + super + + get_expression_pool.add_observer :error do |event, *args| # - # Returns a hash - # - def hash - to_s.hash - # - # a bit costly but as it's only used by resume_process()... + # logs each error occurring in the expression pool + + begin + + record_error(ProcessError.new(*args)) + + rescue Exception => e + lwarn { "(failed to record error : #{e})" } + lwarn { "*** process error : \n" + args.join("\n") } end + end + get_expression_pool.add_observer :terminate do |event, *args| # - # Returns true if the other instance is a ProcessError and is the - # same error as this one. - # - def == (other) - return false unless other.is_a?(ProcessError) - return to_s == other.to_s - # - # a bit costly but as it's only used by resume_process()... - end + # removes error log when a process terminates + + fei = args[0].fei + + remove_error_log fei.wfid \ + if fei.is_in_parent_process? + end end # - # This is a base class for all error journal, don't instantiate, - # work rather with InMemoryErrorJournal (only for testing envs though), - # or YamlErrorJournal. + # Returns true if the given wfid (or fei) (process instance id) + # has had errors. # - class ErrorJournal < Service - include OwfeServiceLocator - include FeiMixin + def has_errors? (wfid) - def initialize (service_name, application_context) + get_error_log(wfid).size > 0 + end - super + # + # Takes care of removing an error from the error journal and + # they replays its process at that point. + # + def replay_at_error (error) - get_expression_pool.add_observer :error do |event, *args| - # - # logs each error occurring in the expression pool - - begin + remove_errors( + error.fei.parent_wfid, + error) - record_error(ProcessError.new(*args)) + get_workqueue.push( + get_expression_pool, + :do_apply_reply, + error.message, + error.fei, + error.workitem) + end - rescue Exception => e - lwarn { "*** process error : \n" + args.join("\n") } - end - end + # + # A utility method : given a list of errors, will make sure that for + # each flow expression only one expression (the most recent) will get + # listed. + # Returns a list of errors, from the oldest to the most recent. + # + # Could be useful when considering a process where multiple replay + # attempts failed. + # + def ErrorJournal.reduce_error_list (errors) - get_expression_pool.add_observer :terminate do |event, *args| - # - # removes error log when a process terminates + h = {} - fei = args[0].fei + errors.each do |e| + h[e.fei] = e + # + # last errors do override previous errors for the + # same fei + end - remove_error_log fei.wfid \ - if fei.is_in_parent_process? - end - end + h.values.sort do |error_a, error_b| + error_a.date <=> error_b.date + end + end + end - # - # Returns true if the given wfid (or fei) (process instance id) - # has had errors. - # - def has_errors? (wfid) + # + # Stores all the errors in a hash... For testing purposes only, like + # the InMemoryExpressionStorage. + # + class InMemoryErrorJournal < ErrorJournal - get_error_log(wfid).size > 0 - end + def initialize (service_name, application_context) - #-- - # - # Commented out : has no real value - # - # Replays the given process instance (wfid or fei) at its last - # recorded error. - # - # There is an optional 'offset' parameter. Its default value is '0'. - # Which means that the replay will occur at the last error. - # - # ejournal.replay_at_last_error('20070630-hiwakuzara', 1) - # - # Will replay a given process instance at its 1 to last error. - # - #def replay_at_last_error (wfid, offset=0) - # wfid = extract_wfid(wfid) - # log = get_error_log(wfid) - # index = (-1 - offset) - # error = log[index] - # raise "no error for process '#{wfid}' at offset #{offset}" \ - # unless error - # replay_at_error error - #end - #++ + super - #-- - # - # Moved to the engine itself. - # - # Replays at a specific error (fetched with read_error_log()). - # - #def replay_at_error (error) - # get_expression_pool.queue_work( - # error.message, - # error.fei, - # error.workitem) - #end - #++ + @per_processes = {} + end - # - # A utility method : given a list of errors, will make sure that for - # each flow expression only one expression (the most recent) will get - # listed. - # Returns a list of errors, from the oldest to the most recent. - # - # Could be useful when considering a process where multiple replay - # attempts failed. - # - def ErrorJournal.reduce_error_list (errors) + # + # Returns a list (older first) of the errors for a process + # instance identified by its fei or wfid. + # + # Will return an empty list if there a no errors for the process + # instances. + # + def get_error_log (wfid) - h = {} + wfid = extract_wfid wfid, true + @per_processes[wfid] or [] + end - errors.each do |e| - h[e.fei] = e - # - # last errors do override previous errors for the - # same fei - end + # + # Removes the error log for a process instance. + # + def remove_error_log (wfid) - h.values.sort do |error_a, error_b| - error_a.date <=> error_b.date - end - end + wfid = extract_wfid wfid, true + @per_processes.delete(wfid) end # - # Stores all the errors in a hash... For testing purposes only, like - # the InMemoryExpressionStorage. + # Removes a list of errors from the error journal. # - class InMemoryErrorJournal < ErrorJournal + # The 'errors' parameter may be a single error (instead of an array). + # + def remove_errors (wfid, errors) - def initialize (service_name, application_context) + errors = Array(errors) - super + log = get_error_log wfid - @per_processes = {} - end + errors.each do |e| + log.delete e + end + end - # - # Returns a list (older first) of the errors for a process - # instance identified by its fei or wfid. - # - # Will return an empty list if there a no errors for the process - # instances. - # - def get_error_log (wfid) + # + # Reads all the error logs currently stored. + # Returns a hash wfid --> error list. + # + def get_error_logs - wfid = extract_wfid wfid, true - @per_processes[wfid] or [] - end + @per_processes + end - # - # Removes the error log for a process instance. - # - def remove_error_log (wfid) + protected - wfid = extract_wfid wfid, true - @per_processes.delete(wfid) - end + def record_error (error) - # - # Removes a list of errors from the error journal. - # - # The 'errors' parameter may be a single error (instead of an array). - # - def remove_errors (wfid, errors) + (@per_processes[error.wfid] ||= []) << error + # not that unreadable after all... + end + end - errors = Array(errors) + # + # A Journal that only keep track of error in process execution. + # + class YamlErrorJournal < ErrorJournal - log = get_error_log wfid + attr_reader :workdir - errors.each do |e| - log.delete e - end - end + def initialize (service_name, application_context) - # - # Reads all the error logs currently stored. - # Returns a hash wfid --> error list. - # - def get_error_logs + require 'openwfe/storage/yamlcustom' + # making sure this file has been required at this point + # this yamlcustom thing prevents the whole OpenWFE ecosystem + # to get serialized :) - @per_processes - end + super - protected + @workdir = get_work_directory + "/ejournal" + #@workdir = File.expand_path @workdir - def record_error (error) - - (@per_processes[error.wfid] ||= []) << error - # not that unreadable after all... - end + FileUtils.makedirs(@workdir) unless File.exist?(@workdir) end - + # - # A Journal that only keep track of error in process execution. + # Returns a list (older first) of the errors for a process + # instance identified by its fei or wfid. # - class YamlErrorJournal < ErrorJournal + # Will return an empty list if there a no errors for the process + # instances. + # + def get_error_log (wfid) - attr_reader :workdir + path = get_path wfid - def initialize (service_name, application_context) + return [] unless File.exist?(path) - require 'openwfe/storage/yamlcustom' - # making sure this file has been required at this point - # this yamlcustom thing prevents the whole OpenWFE ecosystem - # to get serialized :) + read_error_log_from path + end - super + # + # Copies the error log of a process instance to a give path (and + # filename). + # + # Could be useful when one has to perform replay operations and wants + # to keep a copy of the original error[s]. + # + def copy_error_log_to (wfid, path) - @workdir = get_work_directory + "/ejournal" + original_path = get_path wfid + FileUtils.copy_file original_path, path + end - FileUtils.makedirs(@workdir) unless File.exist?(@workdir) - end + # + # Reads an error log from a specific file (possibly as copied over + # via copy_error_log_to()). + # + def read_error_log_from (path) - # - # Returns a list (older first) of the errors for a process - # instance identified by its fei or wfid. - # - # Will return an empty list if there a no errors for the process - # instances. - # - def get_error_log (wfid) + raise "no error log file at #{path}" unless File.exist?(path) - path = get_path wfid + File.open(path) do |f| + s = YAML.load_stream f + s.documents + end + end - return [] unless File.exist?(path) + # + # Removes the error log of a specific process instance. + # Could be a good idea after a succesful replay operation. + # + # 'wfid' may be either a workflow instance id (String) either + # a FlowExpressionId instance. + # + def remove_error_log (wfid) - read_error_log_from path - end + path = get_path wfid - # - # Copies the error log of a process instance to a give path (and - # filename). - # - # Could be useful when one has to perform replay operations and wants - # to keep a copy of the original error[s]. - # - def copy_error_log_to (wfid, path) + File.delete(path) if File.exist?(path) + end - original_path = get_path wfid - FileUtils.copy_file original_path, path - end + # + # Removes a list of errors from this error journal. + # + def remove_errors (wfid, errors) - # - # Reads an error log from a specific file (possibly as copied over - # via copy_error_log_to()). - # - def read_error_log_from (path) + errors = Array(errors) - raise "no error log file at #{path}" unless File.exist?(path) + # load all errors - File.open(path) do |f| - s = YAML.load_stream f - s.documents - end - end + log = get_error_log wfid - # - # Removes the error log of a specific process instance. - # Could be a good idea after a succesful replay operation. - # - # 'wfid' may be either a workflow instance id (String) either - # a FlowExpressionId instance. - # - def remove_error_log (wfid) + # remove the given errors - path = get_path wfid + errors.each do |e| + log.delete e + end - File.delete(path) if File.exist?(path) - end + # rewrite error file - # - # Removes a list of errors from this error journal. - # - def remove_errors (wfid, errors) + path = get_path wfid - errors = Array(errors) + if log.size > 0 - # load all errors + File.open(path, "w") do |f| + log.each do |e| + f.puts e.to_yaml + end + end + else - log = get_error_log wfid + File.delete path + end + end - # remove the given errors + # + # Reads all the error logs currently stored. + # Returns a hash wfid --> error list. + # + def get_error_logs - errors.each do |e| - log.delete e - end + result = {} - # rewrite error file + Find.find(@workdir) do |path| - path = get_path wfid + next unless path.match(/\.ejournal$/) - if log.size > 0 + log = read_error_log_from path + result[log.first.fei.wfid] = log + end - File.open(path, "w") do |f| - log.each do |e| - f.puts e.to_yaml - end - end - else + result + end - File.delete path - end - end + protected - # - # Reads all the error logs currently stored. - # Returns a hash wfid --> error list. - # - def get_error_logs + # + # logs the error as a yaml string in an error log file + # (there is one error log file per workflow instance). + # + def record_error (error) - result = {} + path = get_path error.fei - Find.find(@workdir) do |path| - next unless path.endswith(".ejournal") - wfid = path[0..-9] - log = read_error_log wfid - result[wfid] = log - end + dirpath = File.dirname path - result + FileUtils.mkdir_p(dirpath) unless File.exist?(dirpath) + + File.open path, "a+" do |f| + f.puts error.to_yaml end + end - protected + # + # Returns the path to the error log file of a specific process + # instance. + # + def get_path (fei_or_wfid) - # - # logs the error as a yaml string in an error log file - # (there is one error log file per workflow instance). - # - def record_error (error) - - path = get_path error.fei - - File.open(path, "a+") do |f| - f.puts error.to_yaml - end - end - - # - # Returns the path to the error log file of a specific process - # instance. - # - def get_path (fei_or_wfid) - - @workdir + "/" + extract_wfid(fei_or_wfid, true) + ".ejournal" - end - end + @workdir + "/" + extract_wfid(fei_or_wfid, true) + ".ejournal" + end + end end