lib/ruote/worker.rb in ruote-2.3.0.1 vs lib/ruote/worker.rb in ruote-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -25,11 +25,26 @@
require 'ruote/fei'
module Ruote
+ # A helper for the #worker method, it returns that dummy worker
+ # when there is no reference to the calling worker in the current
+ # thread's local variables.
#
+ DUMMY_WORKER = OpenStruct.new(
+ :name => 'worker', :identity => 'unknown', :state => 'running')
+
+ # Warning, this is not equivalent to doing @context.worker, this method
+ # fetches the worker from the local thread variables.
+ #
+ def self.current_worker
+
+ Thread.current['ruote_worker'] || DUMMY_WORKER
+ end
+
+ #
# Workers fetch 'msgs' and 'schedules' from the storage and process them.
#
# Read more at http://ruote.rubyforge.org/configuration.html
#
class Worker
@@ -337,14 +352,14 @@
# The schedule is triggered if the reservation was successful, true
# is returned.
#
def turn_schedule_to_msg(schedule)
- msg = Ruote.fulldup(schedule['msg'])
-
return false unless @storage.reserve(schedule)
+ msg = Ruote.fulldup(schedule['msg'])
+
@storage.put_msg(msg.delete('action'), msg)
true
end
@@ -388,11 +403,11 @@
reput(msg)
when 'raise'
- handle_msg_error(msg['msg'], msg['error'])
+ @context.error_handler.msg_handle(msg['msg'], msg['error'])
when 'respark'
respark(msg)
@@ -404,30 +419,21 @@
@context.notify(msg)
# notify subscribers of successfully processed msgs
rescue => err
- handle_msg_error(msg, err)
+ @context.error_handler.msg_handle(msg, err)
end
@context.storage.done(msg) if @context.storage.respond_to?(:done)
@info << msg if @info
# for the stats
true
end
- # Passes the msg and the err it resulted in to the error_handler.
- #
- # Some storage/worker implementation may want to override this.
- #
- def handle_msg_error(msg, err)
-
- @context.error_handler.msg_handle(msg, err)
- end
-
# Works for both the 'launch' and the 'apply' msgs.
#
# Creates a new expression, gives and applies it with the
# workitem contained in the msg.
#
@@ -437,12 +443,12 @@
variables = msg['variables']
wi = msg['workitem']
exp_class = @context.expmap.expression_class(tree.first)
- # msg['wfid'] only : it's a launch
- # msg['fei'] : it's a sub launch (a supplant ?)
+ # msg['wfid'] only: it's a launch
+ # msg['fei']: it's a sub launch (a supplant ?)
if is_launch?(msg, exp_class)
name = tree[1]['name'] || tree[1].keys.find { |k| tree[1][k] == nil }
revision = tree[1]['revision'] || tree[1]['rev']
@@ -455,24 +461,30 @@
wi['sub_wf_revision'] = revision
wi['sub_wf_launched_at'] = Ruote.now_to_utc_s
end
exp_hash = {
+
'fei' => msg['fei'] || {
'engine_id' => @context.engine_id,
'wfid' => msg['wfid'],
'subid' => Ruote.generate_subid(msg.inspect),
'expid' => msg['expid'] || '0' },
+
'parent_id' => msg['parent_id'],
'variables' => variables,
'applied_workitem' => wi,
+
'forgotten' => msg['forgotten'],
'lost' => msg['lost'],
'flanking' => msg['flanking'],
+ 'attached' => msg['attached'],
+ 'supplanted' => msg['supplanted'],
+
'stash' => msg['stash'],
'trigger' => msg['trigger'],
- 'on_reply' => msg['on_reply'],
- 'supplanted' => msg['supplanted'] }
+ 'on_reply' => msg['on_reply']
+ }
if not exp_class
exp_class = Ruote::Exp::RefExpression