lib/openwfe/expool/history.rb in ruote-0.9.18 vs lib/openwfe/expool/history.rb in ruote-0.9.19
- old
+ new
@@ -1,174 +1,262 @@
#
#--
-# Copyright (c) 2007, John Mettraux, OpenWFE.org
+# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org
# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
+#
+# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
+# list of conditions and the following disclaimer.
+#
+# . Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
-#
+#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
#
-#require 'date'
-
require 'openwfe/service'
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
module OpenWFE
-
+
+ #
+ # A Mixin for history modules
+ #
+ module HistoryMixin
+ include ServiceMixin
+ include OwfeServiceLocator
+
+ EXPOOL_EVENTS = [
+ :launch, # launching of a [sub]process instance
+ :terminate, # process instance terminates
+ :cancel, # cancelling an expression
+ :error,
+ #:reschedule, # at restart, engine reschedules a timed expression
+ :stop, # stopping the process engine
+ :pause, # pausing a process
+ :resume, # resuming a process
+ #:launch_child, # launching a process 'fragment'
+ #:launch_orphan, # firing and forgetting a sub process
+ #:forget, # forgetting an expression (making it an orphan)
+ #:remove, # removing an expression
+ #:update, # expression changed, reinsertion into storage
+ #:apply,
+ #:reply,
+ #:reply_to_parent, # expression replies to its parent expression
+ ]
+
+ def service_init (service_name, application_context)
+
+ super
+
+ get_expression_pool.add_observer(:all) do |event, *args|
+ handle :expool, event, *args
+ end
+ get_participant_map.add_observer(:all) do |event, *args|
+ handle :pmap, event, *args
+ end
+ end
+
#
- # A Mixin for history modules
+ # filter events, eventually logs them
#
- module HistoryMixin
- include ServiceMixin, OwfeServiceLocator
+ def handle (source, event, *args)
- def service_init (service_name, application_context)
+ # filtering expool events
- super
+ return if source == :expool and (not EXPOOL_EVENTS.include?(event))
- get_expression_pool.add_observer(:all) do |event, *args|
- log(event, *args)
- end
- end
+ # normalizing pmap events
- def log (event, *args)
- raise NotImplementedError.new(
- "please provide an implementation of log(e, fei, wi)")
- end
+ return if source == :pmap and args.first == :after_consume
+
+ if source == :pmap and (not event.is_a?(Symbol))
+ return if args.first == :apply
+ e = event
+ event = args.first
+ args[0] = e
+ end
+ # have to do that swap has pmap uses the participant name as
+ # a "channel name"
+
+ # ok, do log now
+
+ log source, event, *args
end
#
- # A base implementation for InMemoryHistory and FileHistory.
+ # the logging job itself
#
- class BaseHistory
- include HistoryMixin
- include FeiMixin
+ def log (source, event, *args)
- attr_reader :entries
+ raise NotImplementedError.new(
+ "please provide an implementation of log(source, event, *args)")
+ end
- def initialize (service_name, application_context)
+ #
+ # scans the arguments of the event to determine the fei
+ # (flow expression id) related to the event
+ #
+ def get_fei (args)
- super()
+ args.each do |a|
+ return a.fei if a.respond_to?(:fei)
+ return a if a.is_a?(FlowExpressionId)
+ end
- service_init(service_name, application_context)
- end
+ nil
+ end
- def log (event, *args)
+ #
+ # builds a 'message' string out of the event / args combination
+ #
+ def get_message (source, event, args)
- return if event == :update
- return if event == :reschedule
- return if event == :stop
+ args.inject([]) { |r, a|
+ r << a if a.is_a?(Symbol) or a.is_a?(String)
+ r
+ }.join(" ")
+ end
- msg = "#{Time.now.to_s} -- "
+ #
+ # returns the workitem among the logged args
+ #
+ def get_workitem (args)
- msg << event.to_s
+ args.find { |a| a.is_a?(WorkItem) }
+ end
+ end
- if args.length > 0
- fei = extract_fei args[0]
- msg << " #{fei.to_s}"
- end
+ #
+ # A base implementation for InMemoryHistory and FileHistory.
+ #
+ class History
+ include HistoryMixin
- #msg << " #{args[1].to_s}" \
- # if args.length > 1
+ def initialize (service_name, application_context)
- @output << msg + "\n"
- end
+ super()
+
+ service_init(service_name, application_context)
end
+ def log (source, event, *args)
+
+ t = Time.now
+
+ msg = "#{t} .#{t.usec} -- #{source.to_s} #{event.to_s}"
+
+ msg << " #{get_fei(args).to_s}" if args.length > 0
+
+ m = get_message(source, event, args)
+ msg << " #{m}" if m
+
+ @output << msg + "\n"
+ end
+ end
+
+ #
+ # The simplest implementation, stores the latest 1000 history
+ # entries in memory.
+ #
+ class InMemoryHistory < History
+
#
- # The simplest implementation, stores all history entries in memory.
+ # the max number of history items stored. By default it's 1000
#
- # DO NOT USE IN PRODUCTION, it will trigger an 'out of memory' error
- # sooner or later.
+ attr_accessor :maxsize
+
+ def initialize (service_name, application_context)
+
+ super
+
+ @output = []
+ @maxsize = 1008
+ end
+
#
- # Is only used for unit testing purposes.
+ # Returns the array of entries.
#
- class InMemoryHistory < BaseHistory
+ def entries
+ @output
+ end
- def initialize (service_name, application_context)
- super
+ def log (source, event, *args)
- @output = []
- end
+ super
- #
- # Returns the array of entries.
- #
- def entries
- @output
- end
-
- #
- # Returns all the entries as a String.
- #
- def to_s
- s = ""
- @output.each do |entry|
- s << entry.to_s
- end
- s
- end
+ while @output.size > @maxsize
+ @output.shift
+ end
end
#
- # Simply dumps the history in the work directory in a file named
- # "history.log"
- # Warning : no fancy rotation or compression implemented here.
+ # Returns all the entries as a String.
#
- class FileHistory < BaseHistory
+ def to_s
+ @output.inject("") { |r, entry| r << entry.to_s }
+ end
+ end
- def initialize (service_name, application_context)
+ #
+ # Simply dumps the history in the work directory in a file named
+ # "history.log"
+ # Warning : no fancy rotation or compression implemented here.
+ #
+ class FileHistory < History
- super
+ def initialize (service_name, application_context)
- @output = get_work_directory + "/history.log"
- @output = File.open(@output, "w+")
+ super
- linfo { "new() outputting history to #{@output.path}" }
- end
+ @output = get_work_directory + '/history.log'
+ @output = File.open(@output, 'w+')
- #
- # Returns a handle on the output file instance used by this
- # FileHistory.
- #
- def output_file
- @output
- end
+ linfo { "new() outputting history to #{@output.path}" }
+ end
- def stop
- @output.close
- end
+ def log (source, event, *args)
+
+ super unless @output.closed?
end
+
+ #
+ # Returns a handle on the output file instance used by this
+ # FileHistory.
+ #
+ def output_file
+ @output
+ end
+
+ def stop
+ @output.close
+ end
+ end
end