lib/ruote/engine.rb in ruote-2.1.9 vs lib/ruote/engine.rb in ruote-2.1.10

- old
+ new

@@ -27,73 +27,67 @@ require 'ruote/receiver/base' module Ruote + # + # This class holds the 'engine' name, perhaps 'dashboard' would have been + # a better name. Anyway, the methods here allow to launch processes + # and to query about their status. There are also methods for fixing + # issues with stalled processes or processes stuck in errors. + # + # NOTE : the methods #launch and #reply are implemented in + # Ruote::ReceiverMixin + # class Engine include ReceiverMixin - attr_reader :storage - attr_reader :worker attr_reader :context attr_reader :variables def initialize (worker_or_storage, run=true) - if worker_or_storage.respond_to?(:storage) + @context = worker_or_storage.context + @context.engine = self - @worker = worker_or_storage - @storage = @worker.storage - @context = @worker.context - @context.engine = self - else + @variables = EngineVariables.new(@context.storage) - @worker = nil - @storage = worker_or_storage - @context = Ruote::Context.new(@storage, self) - end + @context.worker.run_in_thread if @context.worker && run + # launch the worker if there is one + end - @variables = EngineVariables.new(@storage) + def storage - @worker.run_in_thread if @worker && run + @context.storage end - def launch (process_definition, fields={}, variables={}) + def worker - wfid = @context.wfidgen.generate - - @storage.put_msg( - 'launch', - 'wfid' => wfid, - 'tree' => @context.parser.parse(process_definition), - 'workitem' => { 'fields' => fields }, - 'variables' => variables) - - wfid + @context.worker end def cancel_process (wfid) - @storage.put_msg('cancel_process', 'wfid' => wfid) + @context.storage.put_msg('cancel_process', 'wfid' => wfid) end def kill_process (wfid) - @storage.put_msg('kill_process', 'wfid' => wfid) + @context.storage.put_msg('kill_process', 'wfid' => wfid) end def cancel_expression (fei) fei = fei.to_h if fei.respond_to?(:to_h) - @storage.put_msg('cancel', 'fei' => fei) + @context.storage.put_msg('cancel', 'fei' => fei) end def kill_expression (fei) fei = fei.to_h if fei.respond_to?(:to_h) - @storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill') + @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill') end # Replays at a given error (hopefully you fixed the cause of the error # before replaying...) # @@ -111,13 +105,13 @@ # exp = Ruote::Exp::FlowExpression.fetch(@context, fei) exp.unpersist_or_raise if exp end - @storage.delete(err.to_h) # remove error + @context.storage.delete(err.to_h) # remove error - @storage.put_msg(action, msg) # trigger replay + @context.storage.put_msg(action, msg) # trigger replay end # Re-applies an expression (given via its FlowExpressionId). # # That will cancel the expression and, once the cancel operation is over @@ -146,11 +140,11 @@ # Returns a ProcessStatus instance describing the current status of # a process instance. # def process (wfid) - exps = @storage.get_many('expressions', /!#{wfid}$/) + exps = @context.storage.get_many('expressions', /!#{wfid}$/) errs = self.errors( wfid ) return nil if exps.empty? && errs.empty? ProcessStatus.new(@context, exps, errs) @@ -160,11 +154,11 @@ # # WARNING : this is an expensive operation. # def processes - exps = @storage.get_many('expressions') + exps = @context.storage.get_many('expressions') errs = self.errors by_wfid = {} exps.each do |exp| @@ -179,14 +173,17 @@ # Returns an array of current errors (hashes) # def errors( wfid = nil ) wfid.nil? ? - @storage.get_many('errors') : - @storage.get_many('errors', /!#{wfid}$/) + @context.storage.get_many('errors') : + @context.storage.get_many('errors', /!#{wfid}$/) end + # Shuts down the engine, mostly passes the shutdown message to the other + # services and hope they'll shut down properly. + # def shutdown @context.shutdown end @@ -205,19 +202,28 @@ # # engine.wait_for(5) # # will make the current thread block until 5 messages have been # # processed on the workqueue... # - def wait_for (item) + # engine.wait_for(:empty) + # # will return as soon as the engine/storage is empty, ie as soon + # # as there are no more processes running in the engine (no more + # # expressions placed in the storage) + # + # It's OK to wait for multiple wfids : + # + # engine.wait_for('20100612-bezerijozo', '20100612-yakisoba') + # + def wait_for (*items) logger = @context['s_logger'] raise( "can't wait_for, there is no logger that responds to that call" ) unless logger.respond_to?(:wait_for) - logger.wait_for(item) + logger.wait_for(items) end # Loads and parses the process definition at the given path. # def load_definition (path) @@ -253,41 +259,46 @@ # end # end # engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V') # # - # == passing a block to a participant + # == 'stateless' participants are preferred over 'stateful' ones # - # Usually only the BlockParticipant cares about being passed a block : + # Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be + # in other ruby runtimes. This implies that if you have registered a + # participant instance (instead of passing its classname and options), + # that participant will only run in the worker 'embedded' in the engine + # where it was registered... Let me rephrase it, participants instantiated + # at registration time (and that includes block participants) only runs + # in one worker, always the same. # - # engine.register_participant 'compute_sum' do |workitem| - # workitem.fields['kilroy'] = 'was here' - # end + # 'stateless' participants, instantiated at each dispatch, are preferred. + # Any worker can handle them. # - # But it's OK to pass a block to a custom participant : + # Block participants are still fine for demos (where the worker is included + # in the engine (see all the quickstarts). And small engines with 1 worker + # are not that bad, not everybody is building huge systems). # - # require 'ruote/part/local_participant' + # Here is a 'stateless' participant example : # - # class MyParticipant - # include Ruote::LocalParticipant + # class MyStatelessParticipant # def initialize (opts) - # @name = opts[:name] - # @block = opts[:block] + # @opts = opts # end # def consume (workitem) - # workitem.fields['prestamp'] = Time.now - # workitem.fields['author'] = @name - # @block.call(workitem) - # reply_to_engine(workitem) + # workitem.fields['rocket_name'] = @opts['name'] + # send_to_the_moon(workitem) # end + # def cancel (fei, flavour) + # # do nothing + # end # end # - # engine.register_participant 'al', MyParticipant, :name => 'toto' do |wi| - # wi.fields['nada'] = surf - # end + # engine.register_participant 'moon', MyStatelessParticipant, 'name' => 'saturn5' # - # The block is available under the :block option. + # Remember that the options (the hash that follows the class name), must be + # serialisable via JSON. # def register_participant (regex, participant=nil, opts={}, &block) pa = @context.plist.register(regex, participant, opts, block) @@ -338,9 +349,30 @@ # @engine.configure('ruby_eval_allowed', true) # def configure (config_key, value) @context[config_key] = value + end + + # A convenience methods for advanced users (like Oleg). + # + # Given a fei (flow expression id), fetches the workitem as stored in + # the expression with that fei. + # This is the "applied workitem", if the workitem is currently handed to + # a participant, this method will return the workitem as applied, not + # the workitem as saved by the participant/user in whatever worklist it + # uses. If you need that workitem, do the vanilla thing and ask it to + # the [storage] participant or its worklist. + # + # The fei might be a string fei (result of fei.to_storage_id), a + # FlowExpressionId instance or a hash. + # + def workitem (fei) + + fexp = Ruote::Exp::FlowExpression.fetch( + @context, Ruote::FlowExpressionId.extract_h(fei)) + + Ruote::Workitem.new(fexp.h.applied_workitem) end end # # A wrapper class giving easy access to engine variables.