lib/ruote/engine.rb in ruote-2.1.9 vs lib/ruote/engine.rb in ruote-2.1.10
- old
+ new
@@ -27,73 +27,67 @@
require 'ruote/receiver/base'
module Ruote
+ #
+ # This class holds the 'engine' name, perhaps 'dashboard' would have been
+ # a better name. Anyway, the methods here allow to launch processes
+ # and to query about their status. There are also methods for fixing
+ # issues with stalled processes or processes stuck in errors.
+ #
+ # NOTE : the methods #launch and #reply are implemented in
+ # Ruote::ReceiverMixin
+ #
class Engine
include ReceiverMixin
- attr_reader :storage
- attr_reader :worker
attr_reader :context
attr_reader :variables
def initialize (worker_or_storage, run=true)
- if worker_or_storage.respond_to?(:storage)
+ @context = worker_or_storage.context
+ @context.engine = self
- @worker = worker_or_storage
- @storage = @worker.storage
- @context = @worker.context
- @context.engine = self
- else
+ @variables = EngineVariables.new(@context.storage)
- @worker = nil
- @storage = worker_or_storage
- @context = Ruote::Context.new(@storage, self)
- end
+ @context.worker.run_in_thread if @context.worker && run
+ # launch the worker if there is one
+ end
- @variables = EngineVariables.new(@storage)
+ def storage
- @worker.run_in_thread if @worker && run
+ @context.storage
end
- def launch (process_definition, fields={}, variables={})
+ def worker
- wfid = @context.wfidgen.generate
-
- @storage.put_msg(
- 'launch',
- 'wfid' => wfid,
- 'tree' => @context.parser.parse(process_definition),
- 'workitem' => { 'fields' => fields },
- 'variables' => variables)
-
- wfid
+ @context.worker
end
def cancel_process (wfid)
- @storage.put_msg('cancel_process', 'wfid' => wfid)
+ @context.storage.put_msg('cancel_process', 'wfid' => wfid)
end
def kill_process (wfid)
- @storage.put_msg('kill_process', 'wfid' => wfid)
+ @context.storage.put_msg('kill_process', 'wfid' => wfid)
end
def cancel_expression (fei)
fei = fei.to_h if fei.respond_to?(:to_h)
- @storage.put_msg('cancel', 'fei' => fei)
+ @context.storage.put_msg('cancel', 'fei' => fei)
end
def kill_expression (fei)
fei = fei.to_h if fei.respond_to?(:to_h)
- @storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill')
+ @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill')
end
# Replays at a given error (hopefully you fixed the cause of the error
# before replaying...)
#
@@ -111,13 +105,13 @@
#
exp = Ruote::Exp::FlowExpression.fetch(@context, fei)
exp.unpersist_or_raise if exp
end
- @storage.delete(err.to_h) # remove error
+ @context.storage.delete(err.to_h) # remove error
- @storage.put_msg(action, msg) # trigger replay
+ @context.storage.put_msg(action, msg) # trigger replay
end
# Re-applies an expression (given via its FlowExpressionId).
#
# That will cancel the expression and, once the cancel operation is over
@@ -146,11 +140,11 @@
# Returns a ProcessStatus instance describing the current status of
# a process instance.
#
def process (wfid)
- exps = @storage.get_many('expressions', /!#{wfid}$/)
+ exps = @context.storage.get_many('expressions', /!#{wfid}$/)
errs = self.errors( wfid )
return nil if exps.empty? && errs.empty?
ProcessStatus.new(@context, exps, errs)
@@ -160,11 +154,11 @@
#
# WARNING : this is an expensive operation.
#
def processes
- exps = @storage.get_many('expressions')
+ exps = @context.storage.get_many('expressions')
errs = self.errors
by_wfid = {}
exps.each do |exp|
@@ -179,14 +173,17 @@
# Returns an array of current errors (hashes)
#
def errors( wfid = nil )
wfid.nil? ?
- @storage.get_many('errors') :
- @storage.get_many('errors', /!#{wfid}$/)
+ @context.storage.get_many('errors') :
+ @context.storage.get_many('errors', /!#{wfid}$/)
end
+ # Shuts down the engine, mostly passes the shutdown message to the other
+ # services and hope they'll shut down properly.
+ #
def shutdown
@context.shutdown
end
@@ -205,19 +202,28 @@
#
# engine.wait_for(5)
# # will make the current thread block until 5 messages have been
# # processed on the workqueue...
#
- def wait_for (item)
+ # engine.wait_for(:empty)
+ # # will return as soon as the engine/storage is empty, ie as soon
+ # # as there are no more processes running in the engine (no more
+ # # expressions placed in the storage)
+ #
+ # It's OK to wait for multiple wfids :
+ #
+ # engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
+ #
+ def wait_for (*items)
logger = @context['s_logger']
raise(
"can't wait_for, there is no logger that responds to that call"
) unless logger.respond_to?(:wait_for)
- logger.wait_for(item)
+ logger.wait_for(items)
end
# Loads and parses the process definition at the given path.
#
def load_definition (path)
@@ -253,41 +259,46 @@
# end
# end
# engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V')
#
#
- # == passing a block to a participant
+ # == 'stateless' participants are preferred over 'stateful' ones
#
- # Usually only the BlockParticipant cares about being passed a block :
+ # Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be
+ # in other ruby runtimes. This implies that if you have registered a
+ # participant instance (instead of passing its classname and options),
+ # that participant will only run in the worker 'embedded' in the engine
+ # where it was registered... Let me rephrase it, participants instantiated
+ # at registration time (and that includes block participants) only runs
+ # in one worker, always the same.
#
- # engine.register_participant 'compute_sum' do |workitem|
- # workitem.fields['kilroy'] = 'was here'
- # end
+ # 'stateless' participants, instantiated at each dispatch, are preferred.
+ # Any worker can handle them.
#
- # But it's OK to pass a block to a custom participant :
+ # Block participants are still fine for demos (where the worker is included
+ # in the engine (see all the quickstarts). And small engines with 1 worker
+ # are not that bad, not everybody is building huge systems).
#
- # require 'ruote/part/local_participant'
+ # Here is a 'stateless' participant example :
#
- # class MyParticipant
- # include Ruote::LocalParticipant
+ # class MyStatelessParticipant
# def initialize (opts)
- # @name = opts[:name]
- # @block = opts[:block]
+ # @opts = opts
# end
# def consume (workitem)
- # workitem.fields['prestamp'] = Time.now
- # workitem.fields['author'] = @name
- # @block.call(workitem)
- # reply_to_engine(workitem)
+ # workitem.fields['rocket_name'] = @opts['name']
+ # send_to_the_moon(workitem)
# end
+ # def cancel (fei, flavour)
+ # # do nothing
+ # end
# end
#
- # engine.register_participant 'al', MyParticipant, :name => 'toto' do |wi|
- # wi.fields['nada'] = surf
- # end
+ # engine.register_participant 'moon', MyStatelessParticipant, 'name' => 'saturn5'
#
- # The block is available under the :block option.
+ # Remember that the options (the hash that follows the class name), must be
+ # serialisable via JSON.
#
def register_participant (regex, participant=nil, opts={}, &block)
pa = @context.plist.register(regex, participant, opts, block)
@@ -338,9 +349,30 @@
# @engine.configure('ruby_eval_allowed', true)
#
def configure (config_key, value)
@context[config_key] = value
+ end
+
+ # A convenience methods for advanced users (like Oleg).
+ #
+ # Given a fei (flow expression id), fetches the workitem as stored in
+ # the expression with that fei.
+ # This is the "applied workitem", if the workitem is currently handed to
+ # a participant, this method will return the workitem as applied, not
+ # the workitem as saved by the participant/user in whatever worklist it
+ # uses. If you need that workitem, do the vanilla thing and ask it to
+ # the [storage] participant or its worklist.
+ #
+ # The fei might be a string fei (result of fei.to_storage_id), a
+ # FlowExpressionId instance or a hash.
+ #
+ def workitem (fei)
+
+ fexp = Ruote::Exp::FlowExpression.fetch(
+ @context, Ruote::FlowExpressionId.extract_h(fei))
+
+ Ruote::Workitem.new(fexp.h.applied_workitem)
end
end
#
# A wrapper class giving easy access to engine variables.