lib/ruote/worker.rb in ruote-2.1.9 vs lib/ruote/worker.rb in ruote-2.1.10

- old
+ new

@@ -25,13 +25,18 @@ require 'ruote/fei' module Ruote + # + # Workers fetch 'msgs' and 'schedules' from the storage and process them. + # + # Read more at http://ruote.rubyforge.org/configuration.html + # class Worker - EXP_ACTIONS = %w[ reply cancel fail receive ] + EXP_ACTIONS = %w[ reply cancel fail receive dispatched ] # 'apply' is comprised in 'launch' # 'receive' is a ParticipantExpression alias for 'reply' PROC_ACTIONS = %w[ cancel_process kill_process ] @@ -41,22 +46,24 @@ attr_reader :run_thread attr_reader :running def initialize (storage) - @storage = storage - @subscribers = [] - @context = Ruote::Context.new(@storage, self) + # must be ready before the storage is created + # services like Logger to subscribe to the worker + @storage = storage + @context = Ruote::Context.new(storage, self) + @last_time = Time.at(0.0).utc # 1970... @running = true @run_thread = nil @msgs = [] - @sleep_time = 0.001 + @sleep_time = 0.000 end def run while(@running) do @@ -64,11 +71,11 @@ end end def run_in_thread - #Thread.abort_on_exception = true + Thread.abort_on_exception = true # TODO : remove me at some point @running = true @run_thread = Thread.new { run } @@ -80,66 +87,44 @@ end def shutdown @running = false - @run_thread.join if @run_thread - end - # This method is public, since it's used by the DispatchPool when - # reporting an error that occurred in the dispatch/consume thread of - # a participant. - # - def handle_exception (msg, fexp, ex) + return unless @run_thread - wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil) - fei = msg['fei'] || (fexp.h.fei rescue nil) - - # debug only - - if ARGV.include?('-d') - - puts "\n== worker intercepted error ==" - puts - p ex - ex.backtrace[0, 10].each { |l| puts l } - puts "..." - puts - puts "-- msg --" - msg.keys.sort.each { |k| - puts " #{k.inspect} =>\n#{msg[k].inspect}" - } - puts "-- . --" - puts + begin + @run_thread.join + rescue Exception => e end + end - # on_error ? + # Returns true if the engine system is inactive, ie if all the process + # instances are terminated or are stuck in an error. + # + # NOTE : for now, if a branch of a process is in errors while another is + # still running, this methods will still consider the process instance + # as inactive (and it will return true if all the processes are considered + # inactive). + # + def inactive? - if not(fexp) && fei - fexp = Ruote::Exp::FlowExpression.fetch(@context, fei) - end + # the cheaper tests first - return if fexp && fexp.handle_on_error + return false if @msgs.size > 0 + return false unless @context.storage.empty?('schedules') + return false unless @context.storage.empty?('msgs') - # emit 'msg' + wfids = @context.storage.get_many('expressions').collect { |exp| + exp['fei']['wfid'] + }.sort.uniq - @storage.put_msg( - 'error_intercepted', - 'message' => ex.inspect, - 'wfid' => wfid, - 'msg' => msg) + error_wfids = @context.storage.get_many('errors').collect { |err| + err['fei']['wfid'] + }.sort.uniq - # fill error in the error journal - - @storage.put( - 'type' => 'errors', - '_id' => "err_#{Ruote.to_storage_id(fei)}", - 'message' => ex.inspect, - 'trace' => ex.backtrace.join("\n"), - 'fei' => fei, - 'msg' => msg - ) if fei + (wfids - error_wfids == []) end protected def step @@ -184,37 +169,37 @@ #print r == false ? '*' : '.' break if Time.now.utc - @last_time >= 0.8 end - #puts processed.to_s + #p processed if processed == 0 @sleep_time += 0.001 @sleep_time = 0.499 if @sleep_time > 0.499 sleep(@sleep_time) else - @sleep_time = 0.001 + @sleep_time = 0.000 end end def trigger (schedule) msg = Ruote.fulldup(schedule['msg']) - return false unless @storage.delete(schedule).nil? + return false unless @storage.reserve(schedule) @storage.put_msg(msg.delete('action'), msg) true end def process (msg) return false if cannot_handle(msg) - return false unless @storage.delete(msg).nil? + return false unless @storage.reserve(msg) begin action = msg['action'] @@ -240,13 +225,13 @@ # msg got deleted, might still be interesting for a subscriber end notify(msg) - rescue Exception => ex + rescue Exception => exception - handle_exception(msg, nil, ex) + @context.error_handler.msg_handle(msg, exception) end true end @@ -258,10 +243,14 @@ subscriber.notify(msg) end end end + # Should always return false. Except when the message is a 'dispatch' + # and it's for a participant only available to an 'engine_worker' + # (block participants, stateful participants) + # def cannot_handle (msg) return false if msg['action'] != 'dispatch' @context.engine.nil? && msg['for_engine_worker?'] @@ -320,10 +309,11 @@ end raise_unknown_expression_error(exp_hash) unless exp_class exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree)) + exp.initial_persist exp.do_apply end def raise_unknown_expression_error (exp_hash) @@ -355,14 +345,11 @@ tree[1]['ref'] = key tree[1]['original_ref'] = tree[0] if key != tree[0] if sub - [ Ruote::Exp::SubprocessExpression, [ 'subprocess', *tree[1..2] ] ] - else - [ Ruote::Exp::ParticipantExpression, [ 'participant', *tree[1..2] ] ] end else [ nil, tree ]