lib/ruote/engine.rb in ruote-2.2.0 vs lib/ruote/engine.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -20,885 +20,20 @@
# THE SOFTWARE.
#
# Made in Japan.
#++
-require 'ruote/context'
-require 'ruote/engine/process_status'
-require 'ruote/receiver/base'
+require 'ruote/dashboard'
module Ruote
#
- # This class holds the 'engine' name, perhaps 'dashboard' would have been
- # a better name. Anyway, the methods here allow to launch processes
- # and to query about their status. There are also methods for fixing
- # issues with stalled processes or processes stuck in errors.
+ # This class has been replaced by Ruote::Dashboard.
#
- # NOTE : the methods #launch and #reply are implemented in
- # Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver).
+ # It will be slowly phased out as documentation and tutorials move
+ # from Ruote::Engine to Ruote::Dashboard.
#
- class Engine
-
- include ReceiverMixin
-
- attr_reader :context
- attr_reader :variables
-
- # Creates an engine using either worker or storage.
- #
- # If a storage instance is given as the first argument, the engine will be
- # able to manage processes (for example, launch and cancel workflows) but
- # will not actually run any workflows.
- #
- # If a worker instance is given as the first argument and the second
- # argument is true, engine will start the worker and will be able to both
- # manage and run workflows.
- #
- # If the second options is set to { :join => true }, the worker wil
- # be started and run in the current thread.
- #
- def initialize(worker_or_storage, opts=true)
-
- @context = worker_or_storage.context
- @context.engine = self
-
- @variables = EngineVariables.new(@context.storage)
-
- if @context.worker
- if opts == true
- @context.worker.run_in_thread
- # runs worker in its own thread
- elsif opts == { :join => true }
- @context.worker.run
- # runs worker in current thread (and doesn't return)
- #else
- # worker is not run
- end
- #else
- # no worker
- end
- end
-
- # Returns the storage this engine works with passed at engine
- # initialization.
- #
- def storage
-
- @context.storage
- end
-
- # Returns the worker nested inside this engine (passed at initialization).
- # Returns nil if this engine is only linked to a storage (and the worker
- # is running somewhere else (hopefully)).
- #
- def worker
-
- @context.worker
- end
-
- # A shortcut for engine.context.history
- #
- def history
-
- @context.history
- end
-
- # Quick note : the implementation of launch is found in the module
- # Ruote::ReceiverMixin that the engine includes.
- #
- # Some processes have to have one and only one instance of themselves
- # running, these are called 'singles' ('singleton' is too object-oriented).
- #
- # When called, this method will check if an instance of the pdef is
- # already running (it uses the process definition name attribute), if
- # yes, it will return without having launched anything. If there is no
- # such process running, it will launch it (and register it).
- #
- # Returns the wfid (workflow instance id) of the running single.
- #
- def launch_single(process_definition, fields={}, variables={})
-
- tree = @context.reader.read(process_definition)
- name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first
-
- raise ArgumentError.new(
- 'process definition is missing a name, cannot launch as single'
- ) unless name
-
- singles = @context.storage.get('variables', 'singles') || {
- '_id' => 'singles', 'type' => 'variables', 'h' => {}
- }
- wfid, timestamp = singles['h'][name]
-
- return wfid if wfid && (ps(wfid) || Time.now.to_f - timestamp < 1.0)
- # return wfid if 'singleton' process is already running
-
- wfid = @context.wfidgen.generate
-
- singles['h'][name] = [ wfid, Time.now.to_f ]
-
- r = @context.storage.put(singles)
-
- return launch_single(tree, fields, variables) unless r.nil?
- #
- # the put failed, back to the start...
- #
- # all this to prevent races between multiple engines,
- # multiple launch_single calls (from different Ruby runtimes)
-
- # ... green for launch
-
- @context.storage.put_msg(
- 'launch',
- 'wfid' => wfid,
- 'tree' => tree,
- 'workitem' => { 'fields' => fields },
- 'variables' => variables)
-
- wfid
- end
-
- # Given a workitem or a fei, will do a cancel_expression,
- # else it's a wfid and it does a cancel_process.
- #
- def cancel(wi_or_fei_or_wfid)
-
- target = Ruote.extract_id(wi_or_fei_or_wfid)
-
- if target.is_a?(String)
- @context.storage.put_msg('cancel_process', 'wfid' => target)
- else
- @context.storage.put_msg('cancel', 'fei' => target)
- end
- end
-
- alias cancel_process cancel
- alias cancel_expression cancel
-
- # Given a workitem or a fei, will do a kill_expression,
- # else it's a wfid and it does a kill_process.
- #
- def kill(wi_or_fei_or_wfid)
-
- target = Ruote.extract_id(wi_or_fei_or_wfid)
-
- if target.is_a?(String)
- @context.storage.put_msg('kill_process', 'wfid' => target)
- else
- @context.storage.put_msg('cancel', 'fei' => target, 'flavour' => 'kill')
- end
- end
-
- alias kill_process kill
- alias kill_expression kill
-
- # Replays at a given error (hopefully you fixed the cause of the error
- # before replaying...)
- #
- def replay_at_error(err)
-
- msg = err.msg.dup
- action = msg.delete('action')
-
- msg['replay_at_error'] = true
- # just an indication
-
- if msg['tree'] && fei = msg['fei']
- #
- # nukes the expression in case of [re]apply
- #
- exp = Ruote::Exp::FlowExpression.fetch(@context, fei)
- exp.unpersist_or_raise if exp
- end
-
- @context.storage.delete(err.to_h) # remove error
-
- @context.storage.put_msg(action, msg) # trigger replay
- end
-
- # Re-applies an expression (given via its FlowExpressionId).
- #
- # That will cancel the expression and, once the cancel operation is over
- # (all the children have been cancelled), the expression will get
- # re-applied.
- #
- # == options
- #
- # :tree is used to completely change the tree of the expression at re_apply
- #
- # engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])
- #
- # :fields is used to replace the fields of the workitem at re_apply
- #
- # engine.re_apply(fei, :fields => { 'customer' => 'bob' })
- #
- # :merge_in_fields is used to add / override fields
- #
- # engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })
- #
- def re_apply(fei, opts={})
-
- @context.storage.put_msg('cancel', 'fei' => fei.to_h, 're_apply' => opts)
- end
-
- # Returns a ProcessStatus instance describing the current status of
- # a process instance.
- #
- def process(wfid)
-
- statuses([ wfid ], {}).first
- end
-
- # Returns an array of ProcessStatus instances.
- #
- # WARNING : this is an expensive operation, but it understands :skip
- # and :limit, so pagination is our friend.
- #
- # Please note, if you're interested only in processes that have errors,
- # Engine#errors is a more efficient means.
- #
- # To simply list the wfids of the currently running, Engine#process_wfids
- # is way cheaper to call.
- #
- def processes(opts={})
-
- wfids = @context.storage.expression_wfids(opts)
-
- opts[:count] ? wfids.size : statuses(wfids, opts)
- end
-
- # Returns a list of processes or the process status of a given process
- # instance.
- #
- def ps(wfid=nil)
-
- wfid == nil ? processes : process(wfid)
- end
-
- # Returns an array of current errors (hashes)
- #
- # Can be called in two ways :
- #
- # engine.errors(wfid)
- #
- # and
- #
- # engine.errors(:skip => 100, :limit => 100)
- #
- def errors(wfid=nil)
-
- wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]
-
- errs = wfid.nil? ?
- @context.storage.get_many('errors', nil, options) :
- @context.storage.get_many('errors', wfid)
-
- return errs if options[:count]
-
- errs.collect { |err| ProcessError.new(err) }
- end
-
- # Returns an array of schedules. Those schedules are open structs
- # with various properties, like target, owner, at, put_at, ...
- #
- # Introduced mostly for ruote-kit.
- #
- # Can be called in two ways :
- #
- # engine.schedules(wfid)
- #
- # and
- #
- # engine.schedules(:skip => 100, :limit => 100)
- #
- def schedules(wfid=nil)
-
- wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]
-
- scheds = wfid.nil? ?
- @context.storage.get_many('schedules', nil, options) :
- @context.storage.get_many('schedules', /!#{wfid}-\d+$/)
-
- return scheds if options[:count]
-
- scheds.collect { |s| Ruote.schedule_to_h(s) }.sort_by { |s| s['wfid'] }
- end
-
- # Returns a [sorted] list of wfids of the process instances currently
- # running in the engine.
- #
- # This operation is substantially less costly than Engine#processes (though
- # the 'how substantially' depends on the storage chosen).
- #
- def process_ids
-
- @context.storage.expression_wfids({})
- end
-
- alias process_wfids process_ids
-
- # Warning : expensive operation.
- #
- # Leftovers are workitems, errors and schedules belonging to process
- # instances for which there are no more expressions left.
- #
- # Better delete them or investigate why they are left here.
- #
- # The result is a list of documents (hashes) as found in the storage. Each
- # of them might represent a workitem, an error or a schedule.
- #
- # If you want to delete one of them you can do
- #
- # engine.storage.delete(doc)
- #
- def leftovers
-
- wfids = @context.storage.expression_wfids({})
-
- wis = @context.storage.get_many('workitems').compact
- ers = @context.storage.get_many('errors').compact
- scs = @context.storage.get_many('schedules').compact
- # some slow storages need the compaction... [c]ouch...
-
- (wis + ers + scs).reject { |doc| wfids.include?(doc['fei']['wfid']) }
- end
-
- # Shuts down the engine, mostly passes the shutdown message to the other
- # services and hope they'll shut down properly.
- #
- def shutdown
-
- @context.shutdown
- end
-
- # This method expects there to be a logger with a wait_for method in the
- # context, else it will raise an exception.
- #
- # *WARNING* : wait_for() is meant for environments where there is a unique
- # worker and that worker is nested in this engine. In a multiple worker
- # environment wait_for doesn't see events handled by 'other' workers.
- #
- # This method is only useful for test/quickstart/examples environments.
- #
- # engine.wait_for(:alpha)
- # # will make the current thread block until a workitem is delivered
- # # to the participant named 'alpha'
- #
- # engine.wait_for('123432123-9043')
- # # will make the current thread block until the processed whose
- # # wfid is given (String) terminates or produces an error.
- #
- # engine.wait_for(5)
- # # will make the current thread block until 5 messages have been
- # # processed on the workqueue...
- #
- # engine.wait_for(:empty)
- # # will return as soon as the engine/storage is empty, ie as soon
- # # as there are no more processes running in the engine (no more
- # # expressions placed in the storage)
- #
- # It's OK to wait for multiple wfids :
- #
- # engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
- #
- def wait_for(*items)
-
- logger = @context['s_logger']
-
- raise(
- "can't wait_for, there is no logger that responds to that call"
- ) unless logger.respond_to?(:wait_for)
-
- logger.wait_for(items)
- end
-
- # Joins the worker thread. If this engine has no nested worker, calling
- # this method will simply return immediately.
- #
- def join
-
- worker.join if worker
- end
-
- # Loads (and turns into a tree) the process definition at the given path.
- #
- def load_definition(path)
-
- @context.reader.read(path)
- end
-
- # Registers a participant in the engine.
- #
- # Takes the form
- #
- # engine.register_participant name_or_regex, klass, opts={}
- #
- # With the form
- #
- # engine.register_participant name_or_regex do |workitem|
- # # ...
- # end
- #
- # A BlockParticipant is automatically created.
- #
- #
- # == name or regex
- #
- # When registering participants, strings or regexes are accepted. Behind
- # the scenes, a regex is kept.
- #
- # Passing a string like "alain" will get ruote to automatically turn it
- # into the following regex : /^alain$/.
- #
- # For finer control over this, pass a regex directly
- #
- # engine.register_participant /^user-/, MyParticipant
- # # will match all workitems whose participant name starts with "user-"
- #
- #
- # == some examples
- #
- # engine.register_participant 'compute_sum' do |wi|
- # wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
- # s + c * v # sum + count * value
- # end
- # # a block participant implicitely replies to the engine immediately
- # end
- #
- # class MyParticipant
- # def initialize(opts)
- # @name = opts['name']
- # end
- # def consume(workitem)
- # workitem.fields['rocket_name'] = @name
- # send_to_the_moon(workitem)
- # end
- # def cancel(fei, flavour)
- # # do nothing
- # end
- # end
- #
- # engine.register_participant(
- # /^moon-.+/, MyParticipant, 'name' => 'Saturn-V')
- #
- # # computing the total for a invoice being passed in the workitem.
- # #
- # class TotalParticipant
- # include Ruote::LocalParticipant
- #
- # def consume(workitem)
- # workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item|
- # t + item['count'] * PricingService.lookup(item['id'])
- # }
- # reply_to_engine(workitem)
- # end
- # end
- # engine.register_participant 'total', TotalParticipant
- #
- # Remember that the options (the hash that follows the class name), must be
- # serializable via JSON.
- #
- #
- # == require_path and load_path
- #
- # It's OK to register a participant by passing its full classname as a
- # String.
- #
- # engine.register_participant(
- # 'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
- # engine.register_participant(
- # 'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')
- #
- # Note the option load_path / require_path that point to the ruby file
- # containing the participant implementation. 'require' will load and eval
- # the ruby code only once, 'load' each time.
- #
- def register_participant(regex, participant=nil, opts={}, &block)
-
- if participant.is_a?(Hash)
- opts = participant
- participant = nil
- end
-
- pa = @context.plist.register(regex, participant, opts, block)
-
- @context.storage.put_msg(
- 'participant_registered',
- 'regex' => regex.is_a?(Regexp) ? regex.inspect : regex.to_s)
-
- pa
- end
-
- # A shorter version of #register_participant
- #
- # engine.register 'alice', MailParticipant, :target => 'alice@example.com'
- #
- # or a block registering mechanism.
- #
- # engine.register do
- # alpha 'Participants::Alpha', 'flavour' => 'vanilla'
- # participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
- # catchall ParticipantCharlie, 'flavour' => 'coconut'
- # end
- #
- # Originally implemented in ruote-kit by Torsten Schoenebaum.
- #
- def register(*args, &block)
-
- if args.size > 0
- register_participant(*args, &block)
- else
- proxy = ParticipantRegistrationProxy.new(self)
- block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy)
- end
- end
-
- # Removes/unregisters a participant from the engine.
- #
- def unregister_participant(name_or_participant)
-
- re = @context.plist.unregister(name_or_participant)
-
- raise(ArgumentError.new('participant not found')) unless re
-
- @context.storage.put_msg(
- 'participant_unregistered',
- 'regex' => re.to_s)
- end
-
- alias :unregister :unregister_participant
-
- # Returns a list of Ruote::ParticipantEntry instances.
- #
- # engine.register_participant :alpha, MyParticipant, 'message' => 'hello'
- #
- # # interrogate participant list
- # #
- # list = engine.participant_list
- # participant = list.first
- # p participant.regex
- # # => "^alpha$"
- # p participant.classname
- # # => "MyParticipant"
- # p participant.options
- # # => {"message"=>"hello"}
- #
- # # update participant list
- # #
- # participant.regex = '^alfred$'
- # engine.participant_list = list
- #
- def participant_list
-
- @context.plist.list
- end
-
- # Accepts a list of Ruote::ParticipantEntry instances or a list of
- # [ regex, [ classname, opts ] ] arrays.
- #
- # See Engine#participant_list
- #
- # Some examples :
- #
- # engine.participant_list = [
- # [ '^charly$', [ 'Ruote::StorageParticipant', {} ] ],
- # [ '.+', [ 'MyDefaultParticipant', { 'default' => true } ]
- # ]
- #
- # This method writes the participant list in one go, it might be easier to
- # use than to register participant one by ones.
- #
- def participant_list=(pl)
-
- @context.plist.list = pl
- end
-
- # A convenience method for
- #
- # sp = Ruote::StorageParticipant.new(engine)
- #
- # simply do
- #
- # sp = engine.storage_participant
- #
- def storage_participant
-
- @storage_participant ||= Ruote::StorageParticipant.new(self)
- end
-
- # Returns an instance of the participant registered under the given name.
- # Returns nil if there is no participant registered for that name.
- #
- def participant(name)
-
- @context.plist.lookup(name, nil)
- end
-
- # Adds a service locally (will not get propagated to other workers).
- #
- # tracer = Tracer.new
- # @engine.add_service('tracer', tracer)
- #
- # or
- #
- # @engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')
- #
- # This method returns the service instance it just bound.
- #
- def add_service(name, path_or_instance, classname=nil, opts=nil)
-
- @context.add_service(name, path_or_instance, classname, opts)
- end
-
- # Sets a configuration option. Examples:
- #
- # # allow remote workflow definitions (for subprocesses or when launching
- # # processes)
- # @engine.configure('remote_definition_allowed', true)
- #
- # # allow ruby_eval
- # @engine.configure('ruby_eval_allowed', true)
- #
- def configure(config_key, value)
-
- @context[config_key] = value
- end
-
- # Returns a configuration value.
- #
- # engine.configure('ruby_eval_allowed', true)
- #
- # p engine.configuration('ruby_eval_allowed')
- # # => true
- #
- def configuration(config_key)
-
- @context[config_key]
- end
-
- # Returns the process tree that is triggered in case of error.
- #
- # Note that this 'on_error' doesn't trigger if an on_error is defined
- # in the process itself.
- #
- # Returns nil if there is no 'on_error' set.
- #
- def on_error
-
- @context.storage.get_trackers['trackers']['on_error']['msg']['tree']
-
- rescue
- nil
- end
-
- # Returns the process tree that is triggered in case of process termination.
- #
- # Note that a termination process doesn't raise a termination process when
- # it terminates itself.
- #
- # Returns nil if there is no 'on_terminate' set.
- #
- def on_terminate
-
- @context.storage.get_trackers['trackers']['on_terminate']['msg']['tree']
-
- rescue
- nil
- end
-
- # Sets a participant or subprocess to be triggered when an error occurs
- # in a process instance.
- #
- # engine.on_error = participant_name
- #
- # engine.on_error = subprocess_name
- #
- # engine.on_error = Ruote.process_definition do
- # alpha
- # end
- #
- # Note that this 'on_error' doesn't trigger if an on_error is defined
- # in the process itself.
- #
- def on_error=(target)
-
- @context.tracker.add_tracker(
- nil, # do not track a specific wfid
- 'error_intercepted', # react on 'error_intercepted' msgs
- 'on_error', # the identifier
- nil, # no specific condition
- { 'action' => 'launch',
- 'wfid' => 'replace',
- 'tree' => target.is_a?(String) ?
- [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
- 'workitem' => 'replace',
- 'variables' => 'compile' })
- end
-
- # Sets a participant or a subprocess that is to be launched/called whenever
- # a regular process terminates.
- #
- # engine.on_terminate = participant_name
- #
- # engine.on_terminate = subprocess_name
- #
- # engine.on_terminate = Ruote.define do
- # alpha
- # bravo
- # end
- #
- # Note that a termination process doesn't raise a termination process when
- # it terminates itself.
- #
- # on_terminate processes are not triggered for on_error processes.
- # on_error processes are triggered for on_terminate processes as well.
- #
- def on_terminate=(target)
-
- @context.tracker.add_tracker(
- nil, # do not track a specific wfid
- 'terminated', # react on 'error_intercepted' msgs
- 'on_terminate', # the identifier
- nil, # no specific condition
- { 'action' => 'launch',
- 'tree' => target.is_a?(String) ?
- [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
- 'workitem' => 'replace' })
- end
-
- # A debug helper :
- #
- # engine.noisy = true
- #
- # will let the engine (in fact the worker) pour all the details of the
- # executing process instances to STDOUT.
- #
- def noisy=(b)
-
- @context.logger.noisy = b
- end
-
- protected
-
- # Used by #process and #processes
- #
- def statuses(wfids, opts)
-
- swfids = wfids.collect { |wfid| /!#{wfid}-\d+$/ }
-
- exps = @context.storage.get_many('expressions', wfids).compact
- swis = @context.storage.get_many('workitems', wfids).compact
- errs = @context.storage.get_many('errors', wfids).compact
- schs = @context.storage.get_many('schedules', swfids).compact
- # some slow storages need the compaction... couch...
-
- errs = errs.collect { |err| ProcessError.new(err) }
- schs = schs.collect { |sch| Ruote.schedule_to_h(sch) }
-
- by_wfid = {}
-
- exps.each do |exp|
- (by_wfid[exp['fei']['wfid']] ||= [ [], [], [], [] ])[0] << exp
- end
- swis.each do |swi|
- (by_wfid[swi['fei']['wfid']] ||= [ [], [], [], [] ])[1] << swi
- end
- errs.each do |err|
- (by_wfid[err.wfid] ||= [ [], [], [], [] ])[2] << err
- end
- schs.each do |sch|
- (by_wfid[sch['wfid']] ||= [ [], [], [], [] ])[3] << sch
- end
-
- wfids = by_wfid.keys.sort
- wfids = wfids.reverse if opts[:descending]
- # re-adjust list of wfids, only take what was found
-
- wfids.inject([]) { |a, wfid|
- info = by_wfid[wfid]
- a << ProcessStatus.new(@context, *info) if info
- a
- }
- end
- end
-
- #
- # A wrapper class giving easy access to engine variables.
- #
- # There is one instance of this class for an Engine instance. It is
- # returned when calling Engine#variables.
- #
- class EngineVariables
-
- def initialize(storage)
-
- @storage = storage
- end
-
- def [](k)
-
- @storage.get_engine_variable(k)
- end
-
- def []=(k, v)
-
- @storage.put_engine_variable(k, v)
- end
- end
-
- #
- # Engine#register uses this proxy when it's passed a block.
- #
- # Originally written by Torsten Schoenebaum for ruote-kit.
- #
- class ParticipantRegistrationProxy
-
- def initialize(engine)
-
- @engine = engine
- end
-
- def participant(name, klass=nil, options={}, &block)
-
- @engine.register_participant(name, klass, options, &block)
- end
-
- def catchall(*args)
-
- klass = args.empty? ? Ruote::StorageParticipant : args.first
- options = args[1] || {}
-
- participant('.+', klass, options)
- end
-
- # Maybe a bit audacious...
- #
- def method_missing(method_name, *args)
-
- participant(method_name, *args)
- end
- end
-
- # Refines a schedule as found in the ruote storage into something a bit
- # easier to present.
- #
- def self.schedule_to_h(sched)
-
- h = sched.dup
-
- h.delete('_rev')
- h.delete('type')
- msg = h.delete('msg')
- owner = h.delete('owner')
-
- h['wfid'] = owner['wfid']
- h['action'] = msg['action']
- h['type'] = msg['flavour']
- h['owner'] = Ruote::FlowExpressionId.new(owner)
- h['target'] = Ruote::FlowExpressionId.new(msg['fei'])
-
- h
+ class Engine < Dashboard
end
end