lib/ruote/worker.rb in ruote-2.1.11 vs lib/ruote/worker.rb in ruote-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -45,11 +45,13 @@
attr_reader :context
attr_reader :run_thread
attr_reader :running
- def initialize (storage)
+ # Given a storage, creates a new instance of a Worker.
+ #
+ def initialize(storage)
@subscribers = []
# must be ready before the storage is created
# services like Logger to subscribe to the worker
@@ -91,21 +93,25 @@
def join
@run_thread.join if @run_thread
end
- def subscribe (actions, subscriber)
+ # Loggers and trackers call this method when subscribing for events /
+ # actions in this worker.
+ #
+ def subscribe(actions, subscriber)
@subscribers << [ actions, subscriber ]
end
+ # Shuts down this worker (makes sure it won't fetch further messages
+ # and schedules).
+ #
def shutdown
@running = false
- return unless @run_thread
-
begin
@run_thread.join
rescue Exception => e
end
end
@@ -136,10 +142,13 @@
(wfids - error_wfids == [])
end
protected
+ # One worker step, fetches schedules and triggers those whose time has
+ # came, then fetches msgs and processes them.
+ #
def step
now = Time.now.utc
delta = now - @last_time
@@ -147,13 +156,11 @@
#
# at most once per second, deal with 'ats' and 'crons'
@last_time = now
- @storage.get_schedules(delta, now).each do |sche|
- trigger(sche)
- end
+ @storage.get_schedules(delta, now).each { |sche| trigger(sche) }
end
# msgs
@msgs = @storage.get_msgs if @msgs.empty?
@@ -191,25 +198,40 @@
else
@sleep_time = 0.000
end
end
- def trigger (schedule)
+ # Given a schedule, attempts to trigger it.
+ #
+ # It first tries to
+ # reserve the schedule. If the reservation fails (another worker
+ # was successful probably), false is returned. The schedule is
+ # triggered if the reservation was successful, true is returned.
+ #
+ def trigger(schedule)
msg = Ruote.fulldup(schedule['msg'])
return false unless @storage.reserve(schedule)
@storage.put_msg(msg.delete('action'), msg)
true
end
- def process (msg)
+ # Processes one msg.
+ #
+ # Will return false immediately if the msg reservation failed (another
+ # worker grabbed the message.
+ #
+ # Else will execute the action ordered in the msg, and return true.
+ #
+ # Exceptions in execution are intercepted here and passed to the
+ # engine's (context's) error_handler.
+ #
+ def process(msg)
- return false if cannot_handle(msg)
-
return false unless @storage.reserve(msg)
begin
action = msg['action']
@@ -236,42 +258,37 @@
# msg got deleted, might still be interesting for a subscriber
end
notify(msg)
- rescue Exception => exception
+ rescue => exception
@context.error_handler.msg_handle(msg, exception)
end
true
end
- def notify (msg)
+ # Given a successfully executed msg, now notifies all the subscribers
+ # interested in the kind of action the msg ordered.
+ #
+ def notify(msg)
@subscribers.each do |actions, subscriber|
if actions == :all || actions.include?(msg['action'])
subscriber.notify(msg)
end
end
end
- # Should always return false. Except when the message is a 'dispatch'
- # and it's for a participant only available to an 'engine_worker'
- # (block participants, stateful participants)
- #
- def cannot_handle (msg)
-
- return false if msg['action'] != 'dispatch'
-
- @context.engine.nil? && msg['for_engine_worker?']
- end
-
# Works for both the 'launch' and the 'apply' msgs.
#
- def launch (msg)
+ # Creates a new expression, gives and applies it with the
+ # workitem contained in the msg.
+ #
+ def launch(msg)
tree = msg['tree']
variables = msg['variables']
exp_class = @context.expmap.expression_class(tree.first)
@@ -281,11 +298,11 @@
exp_hash = {
'fei' => msg['fei'] || {
'engine_id' => @context.engine_id,
'wfid' => msg['wfid'],
- 'sub_wfid' => msg['sub_wfid'],
+ 'subid' => Ruote.generate_subid(msg.inspect),
'expid' => '0' },
'parent_id' => msg['parent_id'],
'original_tree' => tree,
'variables' => variables,
'applied_workitem' => msg['workitem'],
@@ -294,32 +311,38 @@
if not exp_class
exp_class = Ruote::Exp::RefExpression
- #elsif msg['action'] == 'launch' && exp_class == Ruote::Exp::DefineExpression
elsif is_launch?(msg, exp_class)
def_name, tree = Ruote::Exp::DefineExpression.reorganize(tree)
variables[def_name] = [ '0', tree ] if def_name
exp_class = Ruote::Exp::SequenceExpression
end
exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree))
exp.initial_persist
- exp.do_apply
+ exp.do_apply(msg)
end
- def is_launch? (msg, exp_class)
+ # Returns true if the msg is a "launch" (ie not a simply "apply").
+ #
+ def is_launch?(msg, exp_class)
return false if exp_class != Ruote::Exp::DefineExpression
return true if msg['action'] == 'launch'
(msg['trigger'] == 'on_re_apply')
end
- def cancel_process (msg)
+ # Handles a 'cancel_process' msg (translates it into a "cancel root
+ # expression of that process" msg).
+ #
+ # Also works for 'kill_process' msgs.
+ #
+ def cancel_process(msg)
root = @storage.find_root_expression(msg['wfid'])
return unless root
@@ -330,9 +353,9 @@
'fei' => root['fei'],
'wfid' => msg['wfid'], # indicates this was triggered by cancel_process
'flavour' => flavour)
end
- alias :kill_process :cancel_process
+ alias kill_process cancel_process
end
end