lib/ruote/worker.rb in ruote-2.3.0.1 vs lib/ruote/worker.rb in ruote-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -25,11 +25,26 @@ require 'ruote/fei' module Ruote + # A helper for the #worker method, it returns that dummy worker + # when there is no reference to the calling worker in the current + # thread's local variables. # + DUMMY_WORKER = OpenStruct.new( + :name => 'worker', :identity => 'unknown', :state => 'running') + + # Warning, this is not equivalent to doing @context.worker, this method + # fetches the worker from the local thread variables. + # + def self.current_worker + + Thread.current['ruote_worker'] || DUMMY_WORKER + end + + # # Workers fetch 'msgs' and 'schedules' from the storage and process them. # # Read more at http://ruote.rubyforge.org/configuration.html # class Worker @@ -337,14 +352,14 @@ # The schedule is triggered if the reservation was successful, true # is returned. # def turn_schedule_to_msg(schedule) - msg = Ruote.fulldup(schedule['msg']) - return false unless @storage.reserve(schedule) + msg = Ruote.fulldup(schedule['msg']) + @storage.put_msg(msg.delete('action'), msg) true end @@ -388,11 +403,11 @@ reput(msg) when 'raise' - handle_msg_error(msg['msg'], msg['error']) + @context.error_handler.msg_handle(msg['msg'], msg['error']) when 'respark' respark(msg) @@ -404,30 +419,21 @@ @context.notify(msg) # notify subscribers of successfully processed msgs rescue => err - handle_msg_error(msg, err) + @context.error_handler.msg_handle(msg, err) end @context.storage.done(msg) if @context.storage.respond_to?(:done) @info << msg if @info # for the stats true end - # Passes the msg and the err it resulted in to the error_handler. - # - # Some storage/worker implementation may want to override this. - # - def handle_msg_error(msg, err) - - @context.error_handler.msg_handle(msg, err) - end - # Works for both the 'launch' and the 'apply' msgs. # # Creates a new expression, gives and applies it with the # workitem contained in the msg. # @@ -437,12 +443,12 @@ variables = msg['variables'] wi = msg['workitem'] exp_class = @context.expmap.expression_class(tree.first) - # msg['wfid'] only : it's a launch - # msg['fei'] : it's a sub launch (a supplant ?) + # msg['wfid'] only: it's a launch + # msg['fei']: it's a sub launch (a supplant ?) if is_launch?(msg, exp_class) name = tree[1]['name'] || tree[1].keys.find { |k| tree[1][k] == nil } revision = tree[1]['revision'] || tree[1]['rev'] @@ -455,24 +461,30 @@ wi['sub_wf_revision'] = revision wi['sub_wf_launched_at'] = Ruote.now_to_utc_s end exp_hash = { + 'fei' => msg['fei'] || { 'engine_id' => @context.engine_id, 'wfid' => msg['wfid'], 'subid' => Ruote.generate_subid(msg.inspect), 'expid' => msg['expid'] || '0' }, + 'parent_id' => msg['parent_id'], 'variables' => variables, 'applied_workitem' => wi, + 'forgotten' => msg['forgotten'], 'lost' => msg['lost'], 'flanking' => msg['flanking'], + 'attached' => msg['attached'], + 'supplanted' => msg['supplanted'], + 'stash' => msg['stash'], 'trigger' => msg['trigger'], - 'on_reply' => msg['on_reply'], - 'supplanted' => msg['supplanted'] } + 'on_reply' => msg['on_reply'] + } if not exp_class exp_class = Ruote::Exp::RefExpression