lib/ruote/part/local_participant.rb in ruote-2.2.0 vs lib/ruote/part/local_participant.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -21,10 +21,11 @@ # # Made in Japan. #++ require 'ruote/receiver/base' +require 'ruote/svc/dispatch_pool' module Ruote # @@ -36,80 +37,175 @@ # It's "local" because it has access to the ruote storage. # module LocalParticipant include ReceiverMixin - # the reply_to_engine method is there + # The engine context, it's a local participant so it knows about the + # context in which the engine operates... + # attr_accessor :context + # Usually set right before a call to #on_workitem or #accept? + # + attr_writer :workitem + + # Usually set right before a call to #on_cancel or #cancel + # + attr_writer :fei + + # Usually set right before a call to #on_cancel or #cancel + # + attr_accessor :flavour + + # Returns the current workitem if no fei is given. + # If a fei is given, it will return the applied workitem for that fei + # (if any). + # + # The optional fei is mostly here for backward compatibility (with 2.2.0) + # + def workitem(fei=nil) + + return fetch_workitem(fei) if fei + + @workitem ? @workitem : applied_workitem + end + + # Returns the current fei (Ruote::FlowExpressionId). + # + def fei + + @fei ? @fei : @workitem.fei + end + + # Returns the Ruote::ParticipantExpression that corresponds with this + # participant. + # + # If a wi_or_fei arg is given, will return the corresponding + # flow expression. This arg is mostly here for backward compatibility. + # + def fexp(wi_or_fei=nil) + + flow_expression(wi_or_fei || fei) + end + + # Returns the workitem as was applied when the Ruote::ParticipantExpression + # was reached. + # + # If the _fei arg is specified, it will return the corresponding applied + # workitem. This args is mostly here for backward compatibility. + # + def applied_workitem(_fei=nil) + + fetch_workitem(_fei || fei) + end + + # Up until ruote 2.3.0, the participant name had to be fetched from the + # workitem. This is a shortcut, it lets you write participant code + # that look like + # + # def on_workitem + # (workitem.fields['supervisors'] || []) << participant_name + # reply + # end + # + def participant_name + + workitem.participant_name + end + + # A shortcut for + # + # fexp.lookup_variable(key) + # + def lookup_variable(key) + + fexp.lookup_variable(key) + end + + # Participant implementations call this method when their #on_workitem + # (#consume) methods are done and they want to hand back the workitem + # to the engine so that the flow can resume. + # + # the (wi=workitem) is mostly for backward compatibility (or for passing a + # totally different workitem to the engine). + # + def reply_to_engine(wi=workitem) + + receive(wi) + end + + alias reply reply_to_engine + # Use this method to re_dispatch the workitem. # # It takes two options :in and :at for "later re_dispatch". # # Look at the unschedule_re_dispatch method for an example of # participant implementation that uses re_dispatch. # # Without one of those options, the method is a "reject". # - def re_dispatch(workitem, opts={}) + def re_dispatch(wi=nil, opts=nil) + wi, opts = [ nil, wi ] if wi.is_a?(Hash) && opts.nil? + wi ||= workitem() + opts ||= {} + + wi.h['re_dispatch_count'] = wi.h['re_dispatch_count'].to_s.to_i + 1 + msg = { 'action' => 'dispatch', - 'fei' => workitem.h.fei, - 'workitem' => workitem.h, - 'participant_name' => workitem.participant_name, - 'rejected' => true + 'fei' => wi.h.fei, + 'workitem' => wi.h, + 'participant_name' => wi.participant_name } if t = opts[:in] || opts[:at] - sched_id = @context.storage.put_schedule('at', workitem.h.fei, t, msg) + sched_id = @context.storage.put_schedule('at', wi.h.fei, t, msg) - fexp = fetch_flow_expression(workitem) - fexp.h['re_dispatch_sched_id'] = sched_id - fexp.try_persist + exp = fexp(wi) + exp.h['re_dispatch_sched_id'] = sched_id + exp.try_persist else @context.storage.put_msg('dispatch', msg) end end # Cancels the scheduled re_dispatch, if any. # - # An example or 'retrying participant' : + # An example of 'retrying participant' : # # class RetryParticipant # include Ruote::LocalParticipant # # def initialize(opts) # @opts = opts # end # - # def consume(workitem) + # def on_workitem # begin # do_the_job - # reply(workitem) + # reply # rescue - # re_dispatch(workitem, :in => @opts['delay'] || '1s') + # re_dispatch(:in => @opts['delay'] || '1s') # end # end # - # def cancel(fei, flavour) - # unschedule_re_dispatch(fei) + # def cancel + # unschedule_re_dispatch # end # end # # Note how unschedule_re_dispatch is used in the cancel method. Warning, # this example could loop forever... # - def unschedule_re_dispatch(fei) + def unschedule_re_dispatch(fei=nil) - fexp = Ruote::Exp::FlowExpression.fetch( - @context, Ruote::FlowExpressionId.extract_h(fei)) - if s = fexp.h['re_dispatch_sched_id'] @context.storage.delete_schedule(s) end end @@ -131,8 +227,112 @@ # the same participant on another worker would accept it). # # Well, here it is, use with care. # alias :reject :re_dispatch + + #-- + # test methods + # prefixed with an underscore + #++ + + # Test shortcut, alleviates the need to set the workitem before calling + # consume / on_workitem. + # + def _on_workitem(wi) + Ruote.participant_send( + self, [ :on_workitem, :consume ], 'workitem' => wi) + end + alias _consume _on_workitem + + # Test shortcut, alleviates the need to set fei and flavour before calling + # cancel / on_consume. + # + def _on_cancel(fei, flavour) + Ruote.participant_send( + self, [ :on_cancel, :cancel ], 'fei' => fei, 'flavour' => flavour) + end + alias _cancel _on_cancel + + # Test shortcut, alleviates the need to set the workitem before calling + # on_reply. + # + def _on_reply(wi) + Ruote.participant_send(self, :on_reply, 'workitem' => wi) + end + + # Test shortcut, alleviates the need to set the workitem before calling + # accept? + # + def _accept?(wi) + Ruote.participant_send(self, :accept?, 'workitem' => wi) + end + + # Test shortcut, alleviates the need to set the workitem before calling + # dont_thread?, do_not_thread? or do_not_thread. + # + def _dont_thread?(wi) + Ruote.participant_send( + self, + [ :dont_thread?, :do_not_thread?, :do_not_thread ], + 'workitem' => wi) + end + alias _do_not_thread _dont_thread? + alias _do_not_thread? _dont_thread? + + # Test shortcut, alleviates the need to set the workitem before calling + # rtimeout. + # + def _rtimeout(wi) + Ruote.participant_send(self, :rtimeout, 'workitem' => wi) + end + + # Returns true if the underlying participant expression is 'gone' (probably + # cancelled somehow). + # + def is_gone? + + fexp.nil? + end + + # Returns true if the underlying participant expression is gone or + # cancelling. + # + def is_cancelled? + + if fe = fexp + return fe.h.state == 'cancelling' + else + true + end + end + + alias is_canceled? is_cancelled? + + protected + + # Receivers and local participants share the #stash_put and #stash_get + # methods. The local participant has #put and #get which don't need + # an initial fei, thus #get and #put deal with the participant + # expression directly, whereas stash_put and stash_get can point at + # any expression. + # + # 'put' can be called as + # + # put('secret' => 'message', 'to' => 'embassy') + # # or + # put('secret', 'message') + # + def put(key, value=nil) + + stash_put(fei, key, value) + end + + # See #put + # + def get(key=nil) + + stash_get(fei, key) + end end end