lib/openwfe/expool/errorjournal.rb in ruote-0.9.19 vs lib/openwfe/expool/errorjournal.rb in ruote-0.9.20
- old
+ new
@@ -1,47 +1,30 @@
-#
#--
-# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org
-# All rights reserved.
+# Copyright (c) 2007-2009, John Mettraux, jmettraux@gmail.com
#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
#
-# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
#
-# . Neither the name of the "OpenWFE" nor the names of its contributors may be
-# used to endorse or promote products derived from this software without
-# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
+# Made in Japan.
#++
-#
-#
-# "made in Japan"
-#
-# John Mettraux at openwfe.org
-#
-require 'find'
-require 'fileutils'
-
require 'openwfe/service'
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
@@ -79,10 +62,12 @@
#
# The String stack trace of the error.
#
attr_reader :stacktrace
+ alias :backtrace :stacktrace
+
#
# The error class (String) of the top level error
#
attr_reader :error_class
@@ -130,11 +115,11 @@
#
# Returns true if the other instance is a ProcessError and is the
# same error as this one.
#
def == (other)
- return false unless other.is_a?(ProcessError)
+ return false unless other.is_a?(OpenWFE::ProcessError)
return to_s == other.to_s
#
# a bit costly but as it's only used by resume_process()...
end
end
@@ -143,43 +128,55 @@
# This is a base class for all error journal, don't instantiate,
# work rather with InMemoryErrorJournal (only for testing envs though),
# or YamlErrorJournal.
#
class ErrorJournal < Service
+
include OwfeServiceLocator
include FeiMixin
def initialize (service_name, application_context)
super
- get_expression_pool.add_observer :error do |event, *args|
+ @observers = []
+
+ @observers << get_expression_pool.add_observer(:error) do |evt, *args|
#
# logs each error occurring in the expression pool
begin
- record_error(ProcessError.new(*args))
+ record_error(OpenWFE::ProcessError.new(*args))
rescue Exception => e
- lwarn { "(failed to record error : #{e})" }
- lwarn { "*** process error : \n" + args.join("\n") }
+ lwarn { "(failed to record error : #{e})\n#{e.backtrace.join("\n")}" }
+ lwarn { "*** process error : \n#{args.join("\n")}" }
end
end
- get_expression_pool.add_observer :terminate do |event, *args|
+ @observers << get_expression_pool.add_observer(:terminate) do |evt, *args|
#
# removes error log when a process terminates
fei = args[0].fei
- remove_error_log fei.wfid \
- if fei.is_in_parent_process?
+ remove_error_log(fei.wfid) if fei.is_in_parent_process?
end
end
#
+ # Stops this journal, takes care of 'unobserving' the expression pool
+ #
+ def stop
+
+ super
+
+ @observers.each { |o| get_expression_pool.remove_observer(o) }
+ end
+
+ #
# Returns true if the given wfid (or fei) (process instance id)
# has had errors.
#
def has_errors? (wfid)
@@ -190,10 +187,12 @@
# Takes care of removing an error from the error journal and
# they replays its process at that point.
#
def replay_at_error (error)
+ error = error.as_owfe_error if error.respond_to?(:as_owfe_error)
+
remove_errors(
error.fei.parent_wfid,
error)
get_workqueue.push(
@@ -213,22 +212,16 @@
# Could be useful when considering a process where multiple replay
# attempts failed.
#
def ErrorJournal.reduce_error_list (errors)
- h = {}
-
- errors.each do |e|
- h[e.fei] = e
- #
- # last errors do override previous errors for the
- # same fei
- end
-
- h.values.sort do |error_a, error_b|
+ errors.inject({}) { |h, e|
+ h[e.fei] = e; h
+ # last errors do override previous errors for the same fei
+ }.values.sort { |error_a, error_b|
error_a.date <=> error_b.date
- end
+ }
end
end
#
# Stores all the errors in a hash... For testing purposes only, like
@@ -250,20 +243,20 @@
# Will return an empty list if there a no errors for the process
# instances.
#
def get_error_log (wfid)
- wfid = extract_wfid wfid, true
- @per_processes[wfid] or []
+ wfid = extract_wfid(wfid, true)
+ @per_processes[wfid] || []
end
#
# Removes the error log for a process instance.
#
def remove_error_log (wfid)
- wfid = extract_wfid wfid, true
+ wfid = extract_wfid(wfid, true)
@per_processes.delete(wfid)
end
#
# Removes a list of errors from the error journal.
@@ -272,14 +265,14 @@
#
def remove_errors (wfid, errors)
errors = Array(errors)
- log = get_error_log wfid
+ log = get_error_log(wfid)
errors.each do |e|
- log.delete e
+ log.delete(e)
end
end
#
# Reads all the error logs currently stored.
@@ -294,169 +287,8 @@
def record_error (error)
(@per_processes[error.wfid] ||= []) << error
# not that unreadable after all...
- end
- end
-
- #
- # A Journal that only keep track of error in process execution.
- #
- class YamlErrorJournal < ErrorJournal
-
- attr_reader :workdir
-
- def initialize (service_name, application_context)
-
- require 'openwfe/storage/yamlcustom'
- # making sure this file has been required at this point
- # this yamlcustom thing prevents the whole OpenWFE ecosystem
- # to get serialized :)
-
- super
-
- @workdir = get_work_directory + "/ejournal"
- #@workdir = File.expand_path @workdir
-
- FileUtils.makedirs(@workdir) unless File.exist?(@workdir)
- end
-
- #
- # Returns a list (older first) of the errors for a process
- # instance identified by its fei or wfid.
- #
- # Will return an empty list if there a no errors for the process
- # instances.
- #
- def get_error_log (wfid)
-
- path = get_path wfid
-
- return [] unless File.exist?(path)
-
- read_error_log_from path
- end
-
- #
- # Copies the error log of a process instance to a give path (and
- # filename).
- #
- # Could be useful when one has to perform replay operations and wants
- # to keep a copy of the original error[s].
- #
- def copy_error_log_to (wfid, path)
-
- original_path = get_path wfid
- FileUtils.copy_file original_path, path
- end
-
- #
- # Reads an error log from a specific file (possibly as copied over
- # via copy_error_log_to()).
- #
- def read_error_log_from (path)
-
- raise "no error log file at #{path}" unless File.exist?(path)
-
- File.open(path) do |f|
- s = YAML.load_stream f
- s.documents
- end
- end
-
- #
- # Removes the error log of a specific process instance.
- # Could be a good idea after a succesful replay operation.
- #
- # 'wfid' may be either a workflow instance id (String) either
- # a FlowExpressionId instance.
- #
- def remove_error_log (wfid)
-
- path = get_path wfid
-
- File.delete(path) if File.exist?(path)
- end
-
- #
- # Removes a list of errors from this error journal.
- #
- def remove_errors (wfid, errors)
-
- errors = Array(errors)
-
- # load all errors
-
- log = get_error_log wfid
-
- # remove the given errors
-
- errors.each do |e|
- log.delete e
- end
-
- # rewrite error file
-
- path = get_path wfid
-
- if log.size > 0
-
- File.open(path, "w") do |f|
- log.each do |e|
- f.puts e.to_yaml
- end
- end
- else
-
- File.delete path
- end
- end
-
- #
- # Reads all the error logs currently stored.
- # Returns a hash wfid --> error list.
- #
- def get_error_logs
-
- result = {}
-
- Find.find(@workdir) do |path|
-
- next unless path.match(/\.ejournal$/)
-
- log = read_error_log_from path
- result[log.first.fei.wfid] = log
- end
-
- result
- end
-
- protected
-
- #
- # logs the error as a yaml string in an error log file
- # (there is one error log file per workflow instance).
- #
- def record_error (error)
-
- path = get_path error.fei
-
- dirpath = File.dirname path
-
- FileUtils.mkdir_p(dirpath) unless File.exist?(dirpath)
-
- File.open path, "a+" do |f|
- f.puts error.to_yaml
- end
- end
-
- #
- # Returns the path to the error log file of a specific process
- # instance.
- #
- def get_path (fei_or_wfid)
-
- @workdir + "/" + extract_wfid(fei_or_wfid, true) + ".ejournal"
end
end
end