lib/ruote/log/wait_logger.rb in ruote-2.2.0 vs lib/ruote/log/wait_logger.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -20,48 +20,255 @@
# THE SOFTWARE.
#
# Made in Japan.
#++
-require 'ruote/log/test_logger'
-
module Ruote
+ # The error raised by WaitLogger#wait_for upon a timeout.
#
- # A helper logger for quickstart examples.
+ class LoggerTimeout < StandardError
+
+ def initialize(interests, timeout)
+
+ super("waited for #{interests.inspect}, timed out after #{timeout}s")
+ end
+ end
+
#
- # Keeps a maximum of 147 messages.
+ # The logic behind Ruote::Dashboard#wait_for is implemented here.
#
- class WaitLogger < TestLogger
+ # This logger keeps track of the last 147 events. This number can
+ # be tweaked via the 'wait_logger_max' storage option
+ # (http://ruote.rubyforge.org/configuration.html)
+ #
+ # One doesn't play directly with this class. It's available only via
+ # the Ruote::Dashboard#wait_for and Ruote::Dashboard#noisy=
+ #
+ # To access the log of processed msgs, look at history services, not
+ # at this wait_logger.
+ #
+ # === options (storage initialization options)
+ #
+ # wait_logger_max(Integer)::
+ # defaults to 147, max number of recent records to keep track of
+ # wait_logger_timeout(Integer)::
+ # defaults to 60 (seconds), #wait_for times out after how many seconds?
+ #
+ class WaitLogger
+ require 'ruote/log/fancy_printing'
+
+ attr_reader :seen
+ attr_reader :log
+
+ # When set to true, this logger will spit out the ruote activity
+ # happening in this Ruby's runtime ruote worker (if any) to $stdout.
+ #
attr_accessor :noisy
+ # The timeout for #wait_for. Defaults to 60 (seconds). When set to
+ # number inferior or equal to zero, no timeout will be enforced.
+ #
+ attr_accessor :timeout
+
def initialize(context)
@context = context
- @color = 33
- @context.worker.subscribe(:all, self) if @context.worker
+ @seen = []
+ @log = []
+ @waiting = []
- @noisy = false
@count = -1
+ @color = 33
+ @noisy = false
- @seen = []
- @waiting = []
+ @log_max = context['wait_logger_max'] || 147
+ @timeout = context['wait_logger_timeout'] || 60 # in seconds
+
+ @check_mutex = Mutex.new
end
- def notify(msg)
+ # The context will call this method for each msg sucessfully processed
+ # by the worker.
+ #
+ def on_msg(msg)
- puts(pretty_print(msg)) if @noisy
+ puts(fancy_print(msg, @noisy)) if @noisy
- #return if @waiting.size < 1
- #check_msg(msg)
+ return if msg['action'] == 'noop'
@seen << msg
- @seen.shift if @seen.size > 147
+ @log << msg
- check_waiting
+ while @log.size > @log_max; @log.shift; end
+ while @seen.size > @log_max; @seen.shift; end
+ end
+
+ # Returns an array of the latest msgs, but fancy-printed. The oldest
+ # first.
+ #
+ def fancy_log
+
+ @log.collect { |msg| fancy_print(msg) }
+ end
+
+ # Debug only : dumps all the seen events to $stdout
+ #
+ def dump
+
+ @seen.collect { |msg| fancy_print(msg) }.join("\n")
+ end
+
+ # Blocks until one or more interests are satisfied.
+ #
+ # interests must be an array of interests. Please refer to
+ # Dashboard#wait_for documentation for allowed values of each interest.
+ #
+ # If multiple interests are given, wait_for blocks until
+ # all of the interests are satisfied.
+ #
+ # wait_for may only be used by one thread at a time. If one
+ # thread calls wait_for and later another thread calls wait_for
+ # while the first thread is waiting, the first thread's
+ # interests are lost and the first thread will never wake up.
+ #
+ def wait_for(interests, opts={})
+
+ @waiting << [ Thread.current, interests ]
+
+ Thread.current['__result__'] = nil
+ start = Time.now
+
+ to = opts[:timeout] || @timeout
+ to = nil if to.nil? || to <= 0
+
+ loop do
+
+ raise(
+ Ruote::LoggerTimeout.new(interests, to)
+ ) if to && (Time.now - start) > to
+
+ @check_mutex.synchronize { check_waiting }
+
+ break if Thread.current['__result__']
+
+ sleep 0.007
+ end
+
+ Thread.current['__result__']
+ end
+
+ def color=(c)
+
+ @color = c
+ end
+
+ def self.fp(msg)
+
+ @logger ||= TestLogger.new(nil)
+ puts @logger.send(:fancy_print, msg)
+ end
+
+ protected
+
+ def check_waiting
+
+ while @waiting.any? and msg = @seen.shift
+
+ @waiting.delete_if do |thread, interests|
+ thread['__result__'] = msg if matches(interests, msg)
+ (interests.size < 1)
+ end
+ end
+ end
+
+ FINAL_ACTIONS = %w[
+ terminated ceased error_intercepted
+ ]
+ ACTIONS = FINAL_ACTIONS + %w[
+ launch apply reply
+ fail
+ dispatch dispatched receive
+ cancel dispatch_cancel kill
+ pause resume dispatch_pause dispatch_resume
+ regenerate
+ pause_process resume_process cancel_process kill_process
+ reput noop raise
+ respark
+ ]
+
+ # Checks whether message msg matches any of interests being waited for.
+ #
+ # Some interests look for actions on particular workflows (e.g.,
+ # waiting for some workflow to finish). Other interests are not
+ # attached to any particular workflow (e.g., :inactive waits until
+ # the engine finishes processing all active and pending workflows)
+ # but are still satisfied when actions happen on workflows (e.g.,
+ # the last workflow being run finishes).
+ #
+ # Returns true if all interests being waited for have been satisfied,
+ # false otherwise.
+ #
+ def matches(interests, msg)
+
+ action = msg['action']
+
+ interests.each do |interest|
+
+ satisfied = case interest
+
+ when :or_error
+ #
+ # let's force an immediate reply
+
+ interests.clear if action == 'error_intercepted'
+
+ when :inactive
+
+ (FINAL_ACTIONS.include?(action) && @context.worker.inactive?)
+
+ when :empty
+
+ (action == 'terminated' && @context.storage.empty?('expressions'))
+
+ when Symbol
+
+ (action == 'dispatch' && msg['participant_name'] == interest.to_s)
+
+ when Fixnum
+
+ interests.delete(interest)
+
+ if (interest > 1)
+ interests << (interest - 1)
+ false
+ else
+ true
+ end
+
+ when Hash
+
+ interest.all? { |k, v|
+ k = 'tree.0' if k == 'exp_name'
+ Ruote.lookup(msg, k) == v
+ }
+
+ when /^[a-z_]+$/
+
+ (action == interest)
+
+ else # wfid
+
+ (FINAL_ACTIONS.include?(action) && msg['wfid'] == interest)
+ end
+
+ interests.delete(interest) if satisfied
+ end
+
+ (interests.size < 1)
end
end
end