lib/ruote/worker.rb in ruote-2.1.9 vs lib/ruote/worker.rb in ruote-2.1.10
- old
+ new
@@ -25,13 +25,18 @@
require 'ruote/fei'
module Ruote
+ #
+ # Workers fetch 'msgs' and 'schedules' from the storage and process them.
+ #
+ # Read more at http://ruote.rubyforge.org/configuration.html
+ #
class Worker
- EXP_ACTIONS = %w[ reply cancel fail receive ]
+ EXP_ACTIONS = %w[ reply cancel fail receive dispatched ]
# 'apply' is comprised in 'launch'
# 'receive' is a ParticipantExpression alias for 'reply'
PROC_ACTIONS = %w[ cancel_process kill_process ]
@@ -41,22 +46,24 @@
attr_reader :run_thread
attr_reader :running
def initialize (storage)
- @storage = storage
-
@subscribers = []
- @context = Ruote::Context.new(@storage, self)
+ # must be ready before the storage is created
+ # services like Logger to subscribe to the worker
+ @storage = storage
+ @context = Ruote::Context.new(storage, self)
+
@last_time = Time.at(0.0).utc # 1970...
@running = true
@run_thread = nil
@msgs = []
- @sleep_time = 0.001
+ @sleep_time = 0.000
end
def run
while(@running) do
@@ -64,11 +71,11 @@
end
end
def run_in_thread
- #Thread.abort_on_exception = true
+ Thread.abort_on_exception = true
# TODO : remove me at some point
@running = true
@run_thread = Thread.new { run }
@@ -80,66 +87,44 @@
end
def shutdown
@running = false
- @run_thread.join if @run_thread
- end
- # This method is public, since it's used by the DispatchPool when
- # reporting an error that occurred in the dispatch/consume thread of
- # a participant.
- #
- def handle_exception (msg, fexp, ex)
+ return unless @run_thread
- wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil)
- fei = msg['fei'] || (fexp.h.fei rescue nil)
-
- # debug only
-
- if ARGV.include?('-d')
-
- puts "\n== worker intercepted error =="
- puts
- p ex
- ex.backtrace[0, 10].each { |l| puts l }
- puts "..."
- puts
- puts "-- msg --"
- msg.keys.sort.each { |k|
- puts " #{k.inspect} =>\n#{msg[k].inspect}"
- }
- puts "-- . --"
- puts
+ begin
+ @run_thread.join
+ rescue Exception => e
end
+ end
- # on_error ?
+ # Returns true if the engine system is inactive, ie if all the process
+ # instances are terminated or are stuck in an error.
+ #
+ # NOTE : for now, if a branch of a process is in errors while another is
+ # still running, this methods will still consider the process instance
+ # as inactive (and it will return true if all the processes are considered
+ # inactive).
+ #
+ def inactive?
- if not(fexp) && fei
- fexp = Ruote::Exp::FlowExpression.fetch(@context, fei)
- end
+ # the cheaper tests first
- return if fexp && fexp.handle_on_error
+ return false if @msgs.size > 0
+ return false unless @context.storage.empty?('schedules')
+ return false unless @context.storage.empty?('msgs')
- # emit 'msg'
+ wfids = @context.storage.get_many('expressions').collect { |exp|
+ exp['fei']['wfid']
+ }.sort.uniq
- @storage.put_msg(
- 'error_intercepted',
- 'message' => ex.inspect,
- 'wfid' => wfid,
- 'msg' => msg)
+ error_wfids = @context.storage.get_many('errors').collect { |err|
+ err['fei']['wfid']
+ }.sort.uniq
- # fill error in the error journal
-
- @storage.put(
- 'type' => 'errors',
- '_id' => "err_#{Ruote.to_storage_id(fei)}",
- 'message' => ex.inspect,
- 'trace' => ex.backtrace.join("\n"),
- 'fei' => fei,
- 'msg' => msg
- ) if fei
+ (wfids - error_wfids == [])
end
protected
def step
@@ -184,37 +169,37 @@
#print r == false ? '*' : '.'
break if Time.now.utc - @last_time >= 0.8
end
- #puts processed.to_s
+ #p processed
if processed == 0
@sleep_time += 0.001
@sleep_time = 0.499 if @sleep_time > 0.499
sleep(@sleep_time)
else
- @sleep_time = 0.001
+ @sleep_time = 0.000
end
end
def trigger (schedule)
msg = Ruote.fulldup(schedule['msg'])
- return false unless @storage.delete(schedule).nil?
+ return false unless @storage.reserve(schedule)
@storage.put_msg(msg.delete('action'), msg)
true
end
def process (msg)
return false if cannot_handle(msg)
- return false unless @storage.delete(msg).nil?
+ return false unless @storage.reserve(msg)
begin
action = msg['action']
@@ -240,13 +225,13 @@
# msg got deleted, might still be interesting for a subscriber
end
notify(msg)
- rescue Exception => ex
+ rescue Exception => exception
- handle_exception(msg, nil, ex)
+ @context.error_handler.msg_handle(msg, exception)
end
true
end
@@ -258,10 +243,14 @@
subscriber.notify(msg)
end
end
end
+ # Should always return false. Except when the message is a 'dispatch'
+ # and it's for a participant only available to an 'engine_worker'
+ # (block participants, stateful participants)
+ #
def cannot_handle (msg)
return false if msg['action'] != 'dispatch'
@context.engine.nil? && msg['for_engine_worker?']
@@ -320,10 +309,11 @@
end
raise_unknown_expression_error(exp_hash) unless exp_class
exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree))
+
exp.initial_persist
exp.do_apply
end
def raise_unknown_expression_error (exp_hash)
@@ -355,14 +345,11 @@
tree[1]['ref'] = key
tree[1]['original_ref'] = tree[0] if key != tree[0]
if sub
-
[ Ruote::Exp::SubprocessExpression, [ 'subprocess', *tree[1..2] ] ]
-
else
-
[ Ruote::Exp::ParticipantExpression, [ 'participant', *tree[1..2] ] ]
end
else
[ nil, tree ]