lib/ruote/engine.rb in ruote-2.1.10 vs lib/ruote/engine.rb in ruote-2.1.11
- old
+ new
@@ -34,56 +34,157 @@
# a better name. Anyway, the methods here allow to launch processes
# and to query about their status. There are also methods for fixing
# issues with stalled processes or processes stuck in errors.
#
# NOTE : the methods #launch and #reply are implemented in
- # Ruote::ReceiverMixin
+ # Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver).
#
class Engine
include ReceiverMixin
attr_reader :context
attr_reader :variables
- def initialize (worker_or_storage, run=true)
+ # Creates an engine using either worker or storage.
+ #
+ # If a storage instance is given as the first argument, the engine will be
+ # able to manage processes (for example, launch and cancel workflows) but
+ # will not actually run any workflows.
+ #
+ # If a worker instance is given as the first argument and the second
+ # argument is true, engine will start the worker and will be able to both
+ # manage and run workflows.
+ #
+ # If the second options is set to { :join => true }, the worker wil
+ # be started and run in the current thread.
+ #
+ def initialize (worker_or_storage, opts=true)
@context = worker_or_storage.context
@context.engine = self
@variables = EngineVariables.new(@context.storage)
- @context.worker.run_in_thread if @context.worker && run
- # launch the worker if there is one
+ if @context.worker
+ if opts == true
+ @context.worker.run_in_thread
+ # runs worker in its own thread
+ elsif opts == { :join => true }
+ @context.worker.run
+ # runs worker in current thread (and doesn't return)
+ #else
+ # worker is not run
+ end
+ #else
+ # no worker
+ end
end
+ # Returns the storage this engine works with passed at engine
+ # initialization.
+ #
def storage
@context.storage
end
+ # Returns the worker nested inside this engine (passed at initialization).
+ # Returns nil if this engine is only linked to a storage (and the worker
+ # is running somewhere else (hopefully)).
+ #
def worker
@context.worker
end
+ # Quick note : the implementation of launch is found in the module
+ # Ruote::ReceiverMixin that the engine includes.
+ #
+ # Some processes have to have one and only one instance of themselves
+ # running, these are called 'singles' ('singleton' is too object-oriented).
+ #
+ # When called, this method will check if an instance of the pdef is
+ # already running (it uses the process definition name attribute), if
+ # yes, it will return without having launched anything. If there is no
+ # such process running, it will launch it (and register it).
+ #
+ # Returns the wfid (workflow instance id) of the running single.
+ #
+ def launch_single (process_definition, fields={}, variables={})
+
+ tree = @context.parser.parse(process_definition)
+ name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first
+
+ raise ArgumentError.new(
+ 'process definition is missing a name, cannot launch as single'
+ ) unless name
+
+ singles = @context.storage.get('variables', 'singles') || {
+ '_id' => 'singles', 'type' => 'variables', 'h' => {}
+ }
+ wfid, timestamp = singles['h'][name]
+
+ if wfid && (timestamp + 1.0 < Time.now.to_f || process(wfid) != nil)
+ return wfid
+ end
+ # process is already running
+
+ wfid = @context.wfidgen.generate
+
+ singles['h'][name] = [ wfid, Time.now.to_f ]
+
+ r = @context.storage.put(singles)
+
+ return launch_single(tree, fields, variables) unless r.nil?
+ #
+ # the put failed, back to the start...
+ #
+ # all this to prevent races between multiple engines,
+ # multiple launch_single calls (from different Ruby runtimes)
+
+ # ... green for launch
+
+ @context.storage.put_msg(
+ 'launch',
+ 'wfid' => wfid,
+ 'tree' => tree,
+ 'workitem' => { 'fields' => fields },
+ 'variables' => variables)
+
+ wfid
+ end
+
+ # Given a process identifier (wfid), cancels this process.
+ #
def cancel_process (wfid)
@context.storage.put_msg('cancel_process', 'wfid' => wfid)
end
+ # Given a process identifier (wfid), kills this process. Killing is
+ # equivalent to cancelling, but when killing, :on_cancel attributes
+ # are not triggered.
+ #
def kill_process (wfid)
@context.storage.put_msg('kill_process', 'wfid' => wfid)
end
+ # Cancels a segment of process instance. Since expressions are nodes in
+ # processes instances, cancelling an expression, will cancel the expression
+ # and all its children (the segment of process).
+ #
def cancel_expression (fei)
fei = fei.to_h if fei.respond_to?(:to_h)
@context.storage.put_msg('cancel', 'fei' => fei)
end
+ # Like #cancel_expression, but :on_cancel attributes (of the expressions)
+ # are not triggered.
+ #
def kill_expression (fei)
fei = fei.to_h if fei.respond_to?(:to_h)
@context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill')
end
@@ -140,58 +241,115 @@
# Returns a ProcessStatus instance describing the current status of
# a process instance.
#
def process (wfid)
- exps = @context.storage.get_many('expressions', /!#{wfid}$/)
- errs = self.errors( wfid )
-
- return nil if exps.empty? && errs.empty?
-
- ProcessStatus.new(@context, exps, errs)
+ list_processes([ wfid ], {}).first
end
# Returns an array of ProcessStatus instances.
#
- # WARNING : this is an expensive operation.
+ # WARNING : this is an expensive operation, but it understands :skip
+ # and :limit, so pagination is our friend.
#
- def processes
+ # Please note, if you're interested only in processes that have errors,
+ # Engine#errors is a more efficient means.
+ #
+ # To simply list the wfids of the currently running, Engine#process_wfids
+ # is way cheaper to call.
+ #
+ def processes (opts={})
- exps = @context.storage.get_many('expressions')
- errs = self.errors
+ wfids = nil
- by_wfid = {}
+ if opts.size > 0
- exps.each do |exp|
- (by_wfid[exp['fei']['wfid']] ||= [ [], [] ]).first << exp
+ wfids = @context.storage.expression_wfids(opts)
+
+ return wfids.size if opts[:count]
end
- errs.each do |err|
- (by_wfid[err['msg']['fei']['wfid']] ||= [ [], [] ]).last << err
- end
- by_wfid.values.collect { |xs, rs| ProcessStatus.new(@context, xs, rs) }
+ list_processes(wfids, opts)
end
# Returns an array of current errors (hashes)
#
- def errors( wfid = nil )
- wfid.nil? ?
- @context.storage.get_many('errors') :
- @context.storage.get_many('errors', /!#{wfid}$/)
+ # Can be called in two ways :
+ #
+ # engine.errors(wfid)
+ #
+ # and
+ #
+ # engine.errors(:skip => 100, :limit => 100)
+ #
+ def errors (wfid=nil)
+
+ wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]
+
+ errs = wfid.nil? ?
+ @context.storage.get_many('errors', nil, options) :
+ @context.storage.get_many('errors', wfid)
+
+ return errs if options[:count]
+
+ errs.collect { |err| ProcessError.new(err) }
end
+ # Returns an array of schedules. Those schedules are open structs
+ # with various properties, like target, owner, at, put_at, ...
+ #
+ # Introduced mostly for ruote-kit.
+ #
+ # Can be called in two ways :
+ #
+ # engine.schedules(wfid)
+ #
+ # and
+ #
+ # engine.schedules(:skip => 100, :limit => 100)
+ #
+ def schedules (wfid=nil)
+
+ wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]
+
+ scheds = wfid.nil? ?
+ @context.storage.get_many('schedules', nil, options) :
+ @context.storage.get_many('schedules', /!#{wfid}-\d+$/)
+
+ return scheds if options[:count]
+
+ scheds.collect { |sched| Ruote.schedule_to_h(sched) }
+ end
+
+ # Returns a [sorted] list of wfids of the process instances currently
+ # running in the engine.
+ #
+ # This operation is substantially less costly than Engine#processes (though
+ # the 'how substantially' depends on the storage chosen).
+ #
+ def process_wfids
+
+ @context.storage.ids('expressions').collect { |sfei|
+ sfei.split('!').last
+ }.uniq.sort
+ end
+
# Shuts down the engine, mostly passes the shutdown message to the other
# services and hope they'll shut down properly.
#
def shutdown
@context.shutdown
end
- # This method expects there is a logger with a wait_for method in the
+ # This method expects there to be a logger with a wait_for method in the
# context, else it will raise an exception.
#
+ # *WARNING* : wait_for() is meant for environments where there is a unique
+ # worker and that worker is nested in this engine. In a multiple worker
+ # environment wait_for doesn't see events handled by 'other' workers.
+ #
# This method is only useful for test/quickstart/examples environments.
#
# engine.wait_for(:alpha)
# # will make the current thread block until a workitem is delivered
# # to the participant named 'alpha'
@@ -222,10 +380,18 @@
) unless logger.respond_to?(:wait_for)
logger.wait_for(items)
end
+ # Joins the worker thread. If this engine has no nested worker, calling
+ # this method will simply return immediately.
+ #
+ def join
+
+ worker.join if worker
+ end
+
# Loads and parses the process definition at the given path.
#
def load_definition (path)
@context.parser.parse(path)
@@ -291,15 +457,31 @@
# def cancel (fei, flavour)
# # do nothing
# end
# end
#
- # engine.register_participant 'moon', MyStatelessParticipant, 'name' => 'saturn5'
+ # engine.register_participant(
+ # 'moon', MyStatelessParticipant, 'name' => 'saturn5')
#
# Remember that the options (the hash that follows the class name), must be
# serialisable via JSON.
#
+ #
+ # == require_path and load_path
+ #
+ # It's OK to register a participant by passing its full classname as a
+ # String.
+ #
+ # engine.register_participant(
+ # 'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
+ # engine.register_participant(
+ # 'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')
+ #
+ # Note the option load_path / require_path that point to the ruby file
+ # containing the participant implementation. 'require' will load and eval
+ # the ruby code only once, 'load' each time.
+ #
def register_participant (regex, participant=nil, opts={}, &block)
pa = @context.plist.register(regex, participant, opts, block)
@context.storage.put_msg(
@@ -308,10 +490,34 @@
'engine_worker_only' => (pa != nil))
pa
end
+ # A shorter version of #register_participant
+ #
+ # engine.register 'alice', MailParticipant, :target => 'alice@example.com'
+ #
+ # or a block registering mechanism.
+ #
+ # engine.register do
+ # alpha 'Participants::Alpha', 'flavour' => 'vanilla'
+ # participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
+ # catchall ParticipantCharlie, 'flavour' => 'coconut'
+ # end
+ #
+ # Originally implemented in ruote-kit by Torsten Schoenebaum.
+ #
+ def register (*args, &block)
+
+ if args.size > 0
+ register_participant(*args, &block)
+ else
+ proxy = ParticipantRegistrationProxy.new(self)
+ block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy)
+ end
+ end
+
# Removes/unregisters a participant from the engine.
#
def unregister_participant (name_or_participant)
re = @context.plist.unregister(name_or_participant)
@@ -321,10 +527,59 @@
@context.storage.put_msg(
'participant_unregistered',
'regex' => re.to_s)
end
+ alias :unregister :unregister_participant
+
+ # Returns a list of Ruote::ParticipantEntry instances.
+ #
+ # engine.register_participant :alpha, MyParticipant, 'message' => 'hello'
+ #
+ # # interrogate participant list
+ # #
+ # list = engine.participant_list
+ # participant = list.first
+ # p participant.regex
+ # # => "^alpha$"
+ # p participant.classname
+ # # => "MyParticipant"
+ # p participant.options
+ # # => {"message"=>"hello"}
+ #
+ # # update participant list
+ # #
+ # participant.regex = '^alfred$'
+ # engine.participant_list = list
+ #
+ def participant_list
+
+ @context.plist.list
+ end
+
+ # Accepts a list of Ruote::ParticipantEntry instances.
+ #
+ # See Engine#participant_list
+ #
+ def participant_list= (pl)
+
+ @context.plist.list = pl
+ end
+
+ # A convenience method for
+ #
+ # sp = Ruote::StorageParticipant.new(engine)
+ #
+ # simply do
+ #
+ # sp = engine.storage_participant
+ #
+ def storage_participant
+
+ @storage_participant ||= Ruote::StorageParticipant.new(self)
+ end
+
# Adds a service locally (will not get propagated to other workers).
#
# tracer = Tracer.new
# @engine.add_service('tracer', tracer)
#
@@ -371,10 +626,68 @@
fexp = Ruote::Exp::FlowExpression.fetch(
@context, Ruote::FlowExpressionId.extract_h(fei))
Ruote::Workitem.new(fexp.h.applied_workitem)
end
+
+ # A debug helper :
+ #
+ # engine.noisy = true
+ #
+ # will let the engine (in fact the worker) pour all the details of the
+ # executing process instances to STDOUT.
+ #
+ def noisy= (b)
+
+ @context.logger.noisy = b
+ end
+
+ protected
+
+ # Used by #process and #processes
+ #
+ def list_processes (wfids, opts)
+
+ swfids = wfids ? wfids.collect { |wfid| /!#{wfid}-\d+$/ } : nil
+
+ exps = @context.storage.get_many('expressions', wfids)
+ swis = @context.storage.get_many('workitems', wfids)
+ errs = @context.storage.get_many('errors', wfids)
+ schs = @context.storage.get_many('schedules', swfids)
+
+ errs = errs.collect { |err| ProcessError.new(err) }
+ schs = schs.collect { |sch| Ruote.schedule_to_h(sch) }
+
+ by_wfid = {}
+
+ exps.each do |exp|
+ (by_wfid[exp['fei']['wfid']] ||= [ [], [], [], [] ])[0] << exp
+ end
+ swis.each do |swi|
+ (by_wfid[swi['fei']['wfid']] ||= [ [], [], [], [] ])[1] << swi
+ end
+ errs.each do |err|
+ (by_wfid[err.wfid] ||= [ [], [], [], [] ])[2] << err
+ end
+ schs.each do |sch|
+ (by_wfid[sch['wfid']] ||= [ [], [], [], [] ])[3] << sch
+ end
+
+ wfids = if wfids
+ wfids
+ else
+ wfids = by_wfid.keys.sort
+ wfids = wfids.reverse if opts[:descending]
+ wfids
+ end
+
+ wfids.inject([]) { |a, wfid|
+ info = by_wfid[wfid]
+ a << ProcessStatus.new(@context, *info) if info
+ a
+ }
+ end
end
#
# A wrapper class giving easy access to engine variables.
#
@@ -395,8 +708,62 @@
def []= (k, v)
@storage.put_engine_variable(k, v)
end
+ end
+
+ #
+ # Engine#register uses this proxy when it's passed a block.
+ #
+ # Originally written by Torsten Schoenebaum for ruote-kit.
+ #
+ class ParticipantRegistrationProxy
+
+ def initialize (engine)
+
+ @engine = engine
+ end
+
+ def participant (name, klass, options={})
+
+ @engine.register_participant(name, klass, options)
+ end
+
+ def catchall (*args)
+
+ klass = args.empty? ? Ruote::StorageParticipant : args.first
+ options = args[1] || {}
+
+ participant('.+', klass, options)
+ end
+
+ # Maybe a bit audacious...
+ #
+ def method_missing (method_name, *args)
+
+ participant(method_name, *args)
+ end
+ end
+
+ # Refines a schedule as found in the ruote storage into something a bit
+ # easier to present.
+ #
+ def self.schedule_to_h (sched)
+
+ h = sched.dup
+
+ h.delete('_rev')
+ h.delete('type')
+ msg = h.delete('msg')
+ owner = h.delete('owner')
+
+ h['wfid'] = owner['wfid']
+ h['action'] = msg['action']
+ h['type'] = msg['flavour']
+ h['owner'] = Ruote::FlowExpressionId.new(owner)
+ h['target'] = Ruote::FlowExpressionId.new(msg['fei'])
+
+ h
end
end