lib/ruote/log/wait_logger.rb in ruote-2.2.0 vs lib/ruote/log/wait_logger.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -20,48 +20,255 @@ # THE SOFTWARE. # # Made in Japan. #++ -require 'ruote/log/test_logger' - module Ruote + # The error raised by WaitLogger#wait_for upon a timeout. # - # A helper logger for quickstart examples. + class LoggerTimeout < StandardError + + def initialize(interests, timeout) + + super("waited for #{interests.inspect}, timed out after #{timeout}s") + end + end + # - # Keeps a maximum of 147 messages. + # The logic behind Ruote::Dashboard#wait_for is implemented here. # - class WaitLogger < TestLogger + # This logger keeps track of the last 147 events. This number can + # be tweaked via the 'wait_logger_max' storage option + # (http://ruote.rubyforge.org/configuration.html) + # + # One doesn't play directly with this class. It's available only via + # the Ruote::Dashboard#wait_for and Ruote::Dashboard#noisy= + # + # To access the log of processed msgs, look at history services, not + # at this wait_logger. + # + # === options (storage initialization options) + # + # wait_logger_max(Integer):: + # defaults to 147, max number of recent records to keep track of + # wait_logger_timeout(Integer):: + # defaults to 60 (seconds), #wait_for times out after how many seconds? + # + class WaitLogger + require 'ruote/log/fancy_printing' + + attr_reader :seen + attr_reader :log + + # When set to true, this logger will spit out the ruote activity + # happening in this Ruby's runtime ruote worker (if any) to $stdout. + # attr_accessor :noisy + # The timeout for #wait_for. Defaults to 60 (seconds). When set to + # number inferior or equal to zero, no timeout will be enforced. + # + attr_accessor :timeout + def initialize(context) @context = context - @color = 33 - @context.worker.subscribe(:all, self) if @context.worker + @seen = [] + @log = [] + @waiting = [] - @noisy = false @count = -1 + @color = 33 + @noisy = false - @seen = [] - @waiting = [] + @log_max = context['wait_logger_max'] || 147 + @timeout = context['wait_logger_timeout'] || 60 # in seconds + + @check_mutex = Mutex.new end - def notify(msg) + # The context will call this method for each msg sucessfully processed + # by the worker. + # + def on_msg(msg) - puts(pretty_print(msg)) if @noisy + puts(fancy_print(msg, @noisy)) if @noisy - #return if @waiting.size < 1 - #check_msg(msg) + return if msg['action'] == 'noop' @seen << msg - @seen.shift if @seen.size > 147 + @log << msg - check_waiting + while @log.size > @log_max; @log.shift; end + while @seen.size > @log_max; @seen.shift; end + end + + # Returns an array of the latest msgs, but fancy-printed. The oldest + # first. + # + def fancy_log + + @log.collect { |msg| fancy_print(msg) } + end + + # Debug only : dumps all the seen events to $stdout + # + def dump + + @seen.collect { |msg| fancy_print(msg) }.join("\n") + end + + # Blocks until one or more interests are satisfied. + # + # interests must be an array of interests. Please refer to + # Dashboard#wait_for documentation for allowed values of each interest. + # + # If multiple interests are given, wait_for blocks until + # all of the interests are satisfied. + # + # wait_for may only be used by one thread at a time. If one + # thread calls wait_for and later another thread calls wait_for + # while the first thread is waiting, the first thread's + # interests are lost and the first thread will never wake up. + # + def wait_for(interests, opts={}) + + @waiting << [ Thread.current, interests ] + + Thread.current['__result__'] = nil + start = Time.now + + to = opts[:timeout] || @timeout + to = nil if to.nil? || to <= 0 + + loop do + + raise( + Ruote::LoggerTimeout.new(interests, to) + ) if to && (Time.now - start) > to + + @check_mutex.synchronize { check_waiting } + + break if Thread.current['__result__'] + + sleep 0.007 + end + + Thread.current['__result__'] + end + + def color=(c) + + @color = c + end + + def self.fp(msg) + + @logger ||= TestLogger.new(nil) + puts @logger.send(:fancy_print, msg) + end + + protected + + def check_waiting + + while @waiting.any? and msg = @seen.shift + + @waiting.delete_if do |thread, interests| + thread['__result__'] = msg if matches(interests, msg) + (interests.size < 1) + end + end + end + + FINAL_ACTIONS = %w[ + terminated ceased error_intercepted + ] + ACTIONS = FINAL_ACTIONS + %w[ + launch apply reply + fail + dispatch dispatched receive + cancel dispatch_cancel kill + pause resume dispatch_pause dispatch_resume + regenerate + pause_process resume_process cancel_process kill_process + reput noop raise + respark + ] + + # Checks whether message msg matches any of interests being waited for. + # + # Some interests look for actions on particular workflows (e.g., + # waiting for some workflow to finish). Other interests are not + # attached to any particular workflow (e.g., :inactive waits until + # the engine finishes processing all active and pending workflows) + # but are still satisfied when actions happen on workflows (e.g., + # the last workflow being run finishes). + # + # Returns true if all interests being waited for have been satisfied, + # false otherwise. + # + def matches(interests, msg) + + action = msg['action'] + + interests.each do |interest| + + satisfied = case interest + + when :or_error + # + # let's force an immediate reply + + interests.clear if action == 'error_intercepted' + + when :inactive + + (FINAL_ACTIONS.include?(action) && @context.worker.inactive?) + + when :empty + + (action == 'terminated' && @context.storage.empty?('expressions')) + + when Symbol + + (action == 'dispatch' && msg['participant_name'] == interest.to_s) + + when Fixnum + + interests.delete(interest) + + if (interest > 1) + interests << (interest - 1) + false + else + true + end + + when Hash + + interest.all? { |k, v| + k = 'tree.0' if k == 'exp_name' + Ruote.lookup(msg, k) == v + } + + when /^[a-z_]+$/ + + (action == interest) + + else # wfid + + (FINAL_ACTIONS.include?(action) && msg['wfid'] == interest) + end + + interests.delete(interest) if satisfied + end + + (interests.size < 1) end end end