lib/ruote/part/local_participant.rb in ruote-2.1.9 vs lib/ruote/part/local_participant.rb in ruote-2.1.10

- old
+ new

@@ -20,33 +20,125 @@ # THE SOFTWARE. # # Made in Japan. #++ +require 'ruote/receiver/base' + module Ruote # # Provides methods for 'local' participants. # # Assumes the class that includes this module has a #context method # that points to the worker or engine ruote context. # + # It's "local" because it has access to the ruote storage. + # module LocalParticipant + include ReceiverMixin + # the reply_to_engine method is there + attr_accessor :context - # Sends back the workitem to the ruote engine. + # Use this method to re_dispatch the workitem. # - def reply_to_engine (workitem) + # It takes two options :in and :at for "later re_dispatch". + # + # Look at the unschedule_re_dispatch method for an example of + # participant implementation that uses re_dispatch. + # + # Without one of those options, the method is a "reject". + # + def re_dispatch (workitem, opts={}) - # the local participant knows how to deal with the storage directly - - @context.storage.put_msg( - 'receive', + msg = { + 'action' => 'dispatch', 'fei' => workitem.h.fei, 'workitem' => workitem.h, - 'participant_name' => workitem.participant_name) + 'participant_name' => workitem.participant_name, + 'rejected' => true + } + + if t = opts[:in] || opts[:at] + + sched_id = @context.storage.put_schedule('at', workitem.h.fei, t, msg) + + fexp = fetch_flow_expression(workitem) + fexp.h['re_dispatch_sched_id'] = sched_id + fexp.try_persist + + else + + @context.storage.put_msg('dispatch', msg) + end end + + # Cancels the scheduled re_dispatch, if any. + # + # An example or 'retrying participant' : + # + # class RetryParticipant + # include Ruote::LocalParticipant + # + # def initialize (opts) + # @opts = opts + # end + # + # def consume (workitem) + # begin + # do_the_job + # reply(workitem) + # rescue + # re_dispatch(workitem, :in => @opts['delay'] || '1s') + # end + # end + # + # def cancel (fei, flavour) + # unschedule_re_dispatch(fei) + # end + # end + # + # Note how unschedule_re_dispatch is used in the cancel method. Warning, + # this example could loop forever... + # + def unschedule_re_dispatch (fei) + + fexp = Ruote::Exp::FlowExpression.fetch( + @context, Ruote::FlowExpressionId.extract_h(fei)) + + if s = fexp.h['re_dispatch_sched_id'] + @context.storage.delete_schedule(s) + end + end + + # WARNING : this method is only for 'stateless' participants, ie + # participants that are registered in the engine by passing their class + # and a set of options, like in + # + # engine.register_participant 'alpha', MyParticipant, 'info' => 'none' + # + # This reject method replaces the workitem in the [internal] message queue + # of the ruote engine (since it's a local participant, it has access to + # the storage and it's thus easy). + # The idea is that another worker will pick up the workitem and + # do the participant dispatching. + # + # This is an advanced technique. It was requested by people who + # want to have multiple workers and have only certain worker/participants + # do the handling. + # Using reject is not the best method, it's probably better to implement + # this by re-opening the Ruote::Worker class and changing the + # cannot_handle(msg) method. + # + # reject could be useful anyway, not sure now, but one could imagine + # scenarii where some participants reject workitems temporarily (while + # the same participant on another worker would accept it). + # + # Well, here it is, use with care. + # + alias :reject :re_dispatch end end