lib/openwfe/expool/errorjournal.rb in ruote-0.9.19 vs lib/openwfe/expool/errorjournal.rb in ruote-0.9.20

- old
+ new

@@ -1,47 +1,30 @@ -# #-- -# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org -# All rights reserved. +# Copyright (c) 2007-2009, John Mettraux, jmettraux@gmail.com # -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: # -# . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. # -# . Neither the name of the "OpenWFE" nor the names of its contributors may be -# used to endorse or promote products derived from this software without -# specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. +# Made in Japan. #++ -# -# -# "made in Japan" -# -# John Mettraux at openwfe.org -# -require 'find' -require 'fileutils' - require 'openwfe/service' require 'openwfe/omixins' require 'openwfe/rudefinitions' @@ -79,10 +62,12 @@ # # The String stack trace of the error. # attr_reader :stacktrace + alias :backtrace :stacktrace + # # The error class (String) of the top level error # attr_reader :error_class @@ -130,11 +115,11 @@ # # Returns true if the other instance is a ProcessError and is the # same error as this one. # def == (other) - return false unless other.is_a?(ProcessError) + return false unless other.is_a?(OpenWFE::ProcessError) return to_s == other.to_s # # a bit costly but as it's only used by resume_process()... end end @@ -143,43 +128,55 @@ # This is a base class for all error journal, don't instantiate, # work rather with InMemoryErrorJournal (only for testing envs though), # or YamlErrorJournal. # class ErrorJournal < Service + include OwfeServiceLocator include FeiMixin def initialize (service_name, application_context) super - get_expression_pool.add_observer :error do |event, *args| + @observers = [] + + @observers << get_expression_pool.add_observer(:error) do |evt, *args| # # logs each error occurring in the expression pool begin - record_error(ProcessError.new(*args)) + record_error(OpenWFE::ProcessError.new(*args)) rescue Exception => e - lwarn { "(failed to record error : #{e})" } - lwarn { "*** process error : \n" + args.join("\n") } + lwarn { "(failed to record error : #{e})\n#{e.backtrace.join("\n")}" } + lwarn { "*** process error : \n#{args.join("\n")}" } end end - get_expression_pool.add_observer :terminate do |event, *args| + @observers << get_expression_pool.add_observer(:terminate) do |evt, *args| # # removes error log when a process terminates fei = args[0].fei - remove_error_log fei.wfid \ - if fei.is_in_parent_process? + remove_error_log(fei.wfid) if fei.is_in_parent_process? end end # + # Stops this journal, takes care of 'unobserving' the expression pool + # + def stop + + super + + @observers.each { |o| get_expression_pool.remove_observer(o) } + end + + # # Returns true if the given wfid (or fei) (process instance id) # has had errors. # def has_errors? (wfid) @@ -190,10 +187,12 @@ # Takes care of removing an error from the error journal and # they replays its process at that point. # def replay_at_error (error) + error = error.as_owfe_error if error.respond_to?(:as_owfe_error) + remove_errors( error.fei.parent_wfid, error) get_workqueue.push( @@ -213,22 +212,16 @@ # Could be useful when considering a process where multiple replay # attempts failed. # def ErrorJournal.reduce_error_list (errors) - h = {} - - errors.each do |e| - h[e.fei] = e - # - # last errors do override previous errors for the - # same fei - end - - h.values.sort do |error_a, error_b| + errors.inject({}) { |h, e| + h[e.fei] = e; h + # last errors do override previous errors for the same fei + }.values.sort { |error_a, error_b| error_a.date <=> error_b.date - end + } end end # # Stores all the errors in a hash... For testing purposes only, like @@ -250,20 +243,20 @@ # Will return an empty list if there a no errors for the process # instances. # def get_error_log (wfid) - wfid = extract_wfid wfid, true - @per_processes[wfid] or [] + wfid = extract_wfid(wfid, true) + @per_processes[wfid] || [] end # # Removes the error log for a process instance. # def remove_error_log (wfid) - wfid = extract_wfid wfid, true + wfid = extract_wfid(wfid, true) @per_processes.delete(wfid) end # # Removes a list of errors from the error journal. @@ -272,14 +265,14 @@ # def remove_errors (wfid, errors) errors = Array(errors) - log = get_error_log wfid + log = get_error_log(wfid) errors.each do |e| - log.delete e + log.delete(e) end end # # Reads all the error logs currently stored. @@ -294,169 +287,8 @@ def record_error (error) (@per_processes[error.wfid] ||= []) << error # not that unreadable after all... - end - end - - # - # A Journal that only keep track of error in process execution. - # - class YamlErrorJournal < ErrorJournal - - attr_reader :workdir - - def initialize (service_name, application_context) - - require 'openwfe/storage/yamlcustom' - # making sure this file has been required at this point - # this yamlcustom thing prevents the whole OpenWFE ecosystem - # to get serialized :) - - super - - @workdir = get_work_directory + "/ejournal" - #@workdir = File.expand_path @workdir - - FileUtils.makedirs(@workdir) unless File.exist?(@workdir) - end - - # - # Returns a list (older first) of the errors for a process - # instance identified by its fei or wfid. - # - # Will return an empty list if there a no errors for the process - # instances. - # - def get_error_log (wfid) - - path = get_path wfid - - return [] unless File.exist?(path) - - read_error_log_from path - end - - # - # Copies the error log of a process instance to a give path (and - # filename). - # - # Could be useful when one has to perform replay operations and wants - # to keep a copy of the original error[s]. - # - def copy_error_log_to (wfid, path) - - original_path = get_path wfid - FileUtils.copy_file original_path, path - end - - # - # Reads an error log from a specific file (possibly as copied over - # via copy_error_log_to()). - # - def read_error_log_from (path) - - raise "no error log file at #{path}" unless File.exist?(path) - - File.open(path) do |f| - s = YAML.load_stream f - s.documents - end - end - - # - # Removes the error log of a specific process instance. - # Could be a good idea after a succesful replay operation. - # - # 'wfid' may be either a workflow instance id (String) either - # a FlowExpressionId instance. - # - def remove_error_log (wfid) - - path = get_path wfid - - File.delete(path) if File.exist?(path) - end - - # - # Removes a list of errors from this error journal. - # - def remove_errors (wfid, errors) - - errors = Array(errors) - - # load all errors - - log = get_error_log wfid - - # remove the given errors - - errors.each do |e| - log.delete e - end - - # rewrite error file - - path = get_path wfid - - if log.size > 0 - - File.open(path, "w") do |f| - log.each do |e| - f.puts e.to_yaml - end - end - else - - File.delete path - end - end - - # - # Reads all the error logs currently stored. - # Returns a hash wfid --> error list. - # - def get_error_logs - - result = {} - - Find.find(@workdir) do |path| - - next unless path.match(/\.ejournal$/) - - log = read_error_log_from path - result[log.first.fei.wfid] = log - end - - result - end - - protected - - # - # logs the error as a yaml string in an error log file - # (there is one error log file per workflow instance). - # - def record_error (error) - - path = get_path error.fei - - dirpath = File.dirname path - - FileUtils.mkdir_p(dirpath) unless File.exist?(dirpath) - - File.open path, "a+" do |f| - f.puts error.to_yaml - end - end - - # - # Returns the path to the error log file of a specific process - # instance. - # - def get_path (fei_or_wfid) - - @workdir + "/" + extract_wfid(fei_or_wfid, true) + ".ejournal" end end end