lib/ruote/engine.rb in ruote-2.1.11 vs lib/ruote/engine.rb in ruote-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -56,11 +56,11 @@
# manage and run workflows.
#
# If the second options is set to { :join => true }, the worker wil
# be started and run in the current thread.
#
- def initialize (worker_or_storage, opts=true)
+ def initialize(worker_or_storage, opts=true)
@context = worker_or_storage.context
@context.engine = self
@variables = EngineVariables.new(@context.storage)
@@ -95,10 +95,17 @@
def worker
@context.worker
end
+ # A shortcut for engine.context.history
+ #
+ def history
+
+ @context.history
+ end
+
# Quick note : the implementation of launch is found in the module
# Ruote::ReceiverMixin that the engine includes.
#
# Some processes have to have one and only one instance of themselves
# running, these are called 'singles' ('singleton' is too object-oriented).
@@ -108,13 +115,13 @@
# yes, it will return without having launched anything. If there is no
# such process running, it will launch it (and register it).
#
# Returns the wfid (workflow instance id) of the running single.
#
- def launch_single (process_definition, fields={}, variables={})
+ def launch_single(process_definition, fields={}, variables={})
- tree = @context.parser.parse(process_definition)
+ tree = @context.reader.read(process_definition)
name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first
raise ArgumentError.new(
'process definition is missing a name, cannot launch as single'
) unless name
@@ -122,14 +129,12 @@
singles = @context.storage.get('variables', 'singles') || {
'_id' => 'singles', 'type' => 'variables', 'h' => {}
}
wfid, timestamp = singles['h'][name]
- if wfid && (timestamp + 1.0 < Time.now.to_f || process(wfid) != nil)
- return wfid
- end
- # process is already running
+ return wfid if wfid && (ps(wfid) || Time.now.to_f - timestamp < 1.0)
+ # return wfid if 'singleton' process is already running
wfid = @context.wfidgen.generate
singles['h'][name] = [ wfid, Time.now.to_f ]
@@ -152,49 +157,48 @@
'variables' => variables)
wfid
end
- # Given a process identifier (wfid), cancels this process.
+ # Given a workitem or a fei, will do a cancel_expression,
+ # else it's a wfid and it does a cancel_process.
#
- def cancel_process (wfid)
+ def cancel(wi_or_fei_or_wfid)
- @context.storage.put_msg('cancel_process', 'wfid' => wfid)
- end
+ target = Ruote.extract_id(wi_or_fei_or_wfid)
- # Given a process identifier (wfid), kills this process. Killing is
- # equivalent to cancelling, but when killing, :on_cancel attributes
- # are not triggered.
- #
- def kill_process (wfid)
-
- @context.storage.put_msg('kill_process', 'wfid' => wfid)
+ if target.is_a?(String)
+ @context.storage.put_msg('cancel_process', 'wfid' => target)
+ else
+ @context.storage.put_msg('cancel', 'fei' => target)
+ end
end
- # Cancels a segment of process instance. Since expressions are nodes in
- # processes instances, cancelling an expression, will cancel the expression
- # and all its children (the segment of process).
- #
- def cancel_expression (fei)
+ alias cancel_process cancel
+ alias cancel_expression cancel
- fei = fei.to_h if fei.respond_to?(:to_h)
- @context.storage.put_msg('cancel', 'fei' => fei)
- end
-
- # Like #cancel_expression, but :on_cancel attributes (of the expressions)
- # are not triggered.
+ # Given a workitem or a fei, will do a kill_expression,
+ # else it's a wfid and it does a kill_process.
#
- def kill_expression (fei)
+ def kill(wi_or_fei_or_wfid)
- fei = fei.to_h if fei.respond_to?(:to_h)
- @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill')
+ target = Ruote.extract_id(wi_or_fei_or_wfid)
+
+ if target.is_a?(String)
+ @context.storage.put_msg('kill_process', 'wfid' => target)
+ else
+ @context.storage.put_msg('cancel', 'fei' => target, 'flavour' => 'kill')
+ end
end
+ alias kill_process kill
+ alias kill_expression kill
+
# Replays at a given error (hopefully you fixed the cause of the error
# before replaying...)
#
- def replay_at_error (err)
+ def replay_at_error(err)
msg = err.msg.dup
action = msg.delete('action')
msg['replay_at_error'] = true
@@ -231,21 +235,21 @@
#
# :merge_in_fields is used to add / override fields
#
# engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })
#
- def re_apply (fei, opts={})
+ def re_apply(fei, opts={})
@context.storage.put_msg('cancel', 'fei' => fei.to_h, 're_apply' => opts)
end
# Returns a ProcessStatus instance describing the current status of
# a process instance.
#
- def process (wfid)
+ def process(wfid)
- list_processes([ wfid ], {}).first
+ statuses([ wfid ], {}).first
end
# Returns an array of ProcessStatus instances.
#
# WARNING : this is an expensive operation, but it understands :skip
@@ -255,22 +259,23 @@
# Engine#errors is a more efficient means.
#
# To simply list the wfids of the currently running, Engine#process_wfids
# is way cheaper to call.
#
- def processes (opts={})
+ def processes(opts={})
- wfids = nil
+ wfids = @context.storage.expression_wfids(opts)
- if opts.size > 0
+ opts[:count] ? wfids.size : statuses(wfids, opts)
+ end
- wfids = @context.storage.expression_wfids(opts)
+ # Returns a list of processes or the process status of a given process
+ # instance.
+ #
+ def ps(wfid=nil)
- return wfids.size if opts[:count]
- end
-
- list_processes(wfids, opts)
+ wfid == nil ? processes : process(wfid)
end
# Returns an array of current errors (hashes)
#
# Can be called in two ways :
@@ -279,11 +284,11 @@
#
# and
#
# engine.errors(:skip => 100, :limit => 100)
#
- def errors (wfid=nil)
+ def errors(wfid=nil)
wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]
errs = wfid.nil? ?
@context.storage.get_many('errors', nil, options) :
@@ -305,36 +310,62 @@
#
# and
#
# engine.schedules(:skip => 100, :limit => 100)
#
- def schedules (wfid=nil)
+ def schedules(wfid=nil)
wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]
scheds = wfid.nil? ?
@context.storage.get_many('schedules', nil, options) :
@context.storage.get_many('schedules', /!#{wfid}-\d+$/)
return scheds if options[:count]
- scheds.collect { |sched| Ruote.schedule_to_h(sched) }
+ scheds.collect { |s| Ruote.schedule_to_h(s) }.sort_by { |s| s['wfid'] }
end
# Returns a [sorted] list of wfids of the process instances currently
# running in the engine.
#
# This operation is substantially less costly than Engine#processes (though
# the 'how substantially' depends on the storage chosen).
#
- def process_wfids
+ def process_ids
- @context.storage.ids('expressions').collect { |sfei|
- sfei.split('!').last
- }.uniq.sort
+ @context.storage.expression_wfids({})
end
+ alias process_wfids process_ids
+
+ # Warning : expensive operation.
+ #
+ # Leftovers are workitems, errors and schedules belonging to process
+ # instances for which there are no more expressions left.
+ #
+ # Better delete them or investigate why they are left here.
+ #
+ # The result is a list of documents (hashes) as found in the storage. Each
+ # of them might represent a workitem, an error or a schedule.
+ #
+ # If you want to delete one of them you can do
+ #
+ # engine.storage.delete(doc)
+ #
+ def leftovers
+
+ wfids = @context.storage.expression_wfids({})
+
+ wis = @context.storage.get_many('workitems').compact
+ ers = @context.storage.get_many('errors').compact
+ scs = @context.storage.get_many('schedules').compact
+ # some slow storages need the compaction... [c]ouch...
+
+ (wis + ers + scs).reject { |doc| wfids.include?(doc['fei']['wfid']) }
+ end
+
# Shuts down the engine, mostly passes the shutdown message to the other
# services and hope they'll shut down properly.
#
def shutdown
@@ -369,11 +400,11 @@
#
# It's OK to wait for multiple wfids :
#
# engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
#
- def wait_for (*items)
+ def wait_for(*items)
logger = @context['s_logger']
raise(
"can't wait_for, there is no logger that responds to that call"
@@ -388,84 +419,87 @@
def join
worker.join if worker
end
- # Loads and parses the process definition at the given path.
+ # Loads (and turns into a tree) the process definition at the given path.
#
- def load_definition (path)
+ def load_definition(path)
- @context.parser.parse(path)
+ @context.reader.read(path)
end
- # Registers a participant in the engine. Returns the participant instance.
+ # Registers a participant in the engine.
#
- # Some examples :
+ # Takes the form
#
- # require 'ruote/part/hash_participant'
- # alice = engine.register_participant 'alice', Ruote::HashParticipant
- # # register an in-memory (hash) store for Alice's workitems
+ # engine.register_participant name_or_regex, klass, opts={}
#
+ # With the form
+ #
+ # engine.register_participant name_or_regex do |workitem|
+ # # ...
+ # end
+ #
+ # A BlockParticipant is automatically created.
+ #
+ #
+ # == name or regex
+ #
+ # When registering participants, strings or regexes are accepted. Behind
+ # the scenes, a regex is kept.
+ #
+ # Passing a string like "alain" will get ruote to automatically turn it
+ # into the following regex : /^alain$/.
+ #
+ # For finer control over this, pass a regex directly
+ #
+ # engine.register_participant /^user-/, MyParticipant
+ # # will match all workitems whose participant name starts with "user-"
+ #
+ #
+ # == some examples
+ #
# engine.register_participant 'compute_sum' do |wi|
# wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
# s + c * v # sum + count * value
# end
# # a block participant implicitely replies to the engine immediately
# end
#
# class MyParticipant
- # def initialize (name)
- # @name = name
+ # def initialize(opts)
+ # @name = opts['name']
# end
- # def consume (workitem)
+ # def consume(workitem)
# workitem.fields['rocket_name'] = @name
# send_to_the_moon(workitem)
# end
- # def cancel (fei, flavour)
+ # def cancel(fei, flavour)
# # do nothing
# end
# end
- # engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V')
#
+ # engine.register_participant(
+ # /^moon-.+/, MyParticipant, 'name' => 'Saturn-V')
#
- # == 'stateless' participants are preferred over 'stateful' ones
+ # # computing the total for a invoice being passed in the workitem.
+ # #
+ # class TotalParticipant
+ # include Ruote::LocalParticipant
#
- # Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be
- # in other ruby runtimes. This implies that if you have registered a
- # participant instance (instead of passing its classname and options),
- # that participant will only run in the worker 'embedded' in the engine
- # where it was registered... Let me rephrase it, participants instantiated
- # at registration time (and that includes block participants) only runs
- # in one worker, always the same.
- #
- # 'stateless' participants, instantiated at each dispatch, are preferred.
- # Any worker can handle them.
- #
- # Block participants are still fine for demos (where the worker is included
- # in the engine (see all the quickstarts). And small engines with 1 worker
- # are not that bad, not everybody is building huge systems).
- #
- # Here is a 'stateless' participant example :
- #
- # class MyStatelessParticipant
- # def initialize (opts)
- # @opts = opts
+ # def consume(workitem)
+ # workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item|
+ # t + item['count'] * PricingService.lookup(item['id'])
+ # }
+ # reply_to_engine(workitem)
# end
- # def consume (workitem)
- # workitem.fields['rocket_name'] = @opts['name']
- # send_to_the_moon(workitem)
- # end
- # def cancel (fei, flavour)
- # # do nothing
- # end
# end
+ # engine.register_participant 'total', TotalParticipant
#
- # engine.register_participant(
- # 'moon', MyStatelessParticipant, 'name' => 'saturn5')
- #
# Remember that the options (the hash that follows the class name), must be
- # serialisable via JSON.
+ # serializable via JSON.
#
#
# == require_path and load_path
#
# It's OK to register a participant by passing its full classname as a
@@ -478,18 +512,22 @@
#
# Note the option load_path / require_path that point to the ruby file
# containing the participant implementation. 'require' will load and eval
# the ruby code only once, 'load' each time.
#
- def register_participant (regex, participant=nil, opts={}, &block)
+ def register_participant(regex, participant=nil, opts={}, &block)
+ if participant.is_a?(Hash)
+ opts = participant
+ participant = nil
+ end
+
pa = @context.plist.register(regex, participant, opts, block)
@context.storage.put_msg(
'participant_registered',
- 'regex' => regex.to_s,
- 'engine_worker_only' => (pa != nil))
+ 'regex' => regex.is_a?(Regexp) ? regex.inspect : regex.to_s)
pa
end
# A shorter version of #register_participant
@@ -504,11 +542,11 @@
# catchall ParticipantCharlie, 'flavour' => 'coconut'
# end
#
# Originally implemented in ruote-kit by Torsten Schoenebaum.
#
- def register (*args, &block)
+ def register(*args, &block)
if args.size > 0
register_participant(*args, &block)
else
proxy = ParticipantRegistrationProxy.new(self)
@@ -516,11 +554,11 @@
end
end
# Removes/unregisters a participant from the engine.
#
- def unregister_participant (name_or_participant)
+ def unregister_participant(name_or_participant)
re = @context.plist.unregister(name_or_participant)
raise(ArgumentError.new('participant not found')) unless re
@@ -554,15 +592,26 @@
def participant_list
@context.plist.list
end
- # Accepts a list of Ruote::ParticipantEntry instances.
+ # Accepts a list of Ruote::ParticipantEntry instances or a list of
+ # [ regex, [ classname, opts ] ] arrays.
#
# See Engine#participant_list
#
- def participant_list= (pl)
+ # Some examples :
+ #
+ # engine.participant_list = [
+ # [ '^charly$', [ 'Ruote::StorageParticipant', {} ] ],
+ # [ '.+', [ 'MyDefaultParticipant', { 'default' => true } ]
+ # ]
+ #
+ # This method writes the participant list in one go, it might be easier to
+ # use than to register participant one by ones.
+ #
+ def participant_list=(pl)
@context.plist.list = pl
end
# A convenience method for
@@ -576,10 +625,18 @@
def storage_participant
@storage_participant ||= Ruote::StorageParticipant.new(self)
end
+ # Returns an instance of the participant registered under the given name.
+ # Returns nil if there is no participant registered for that name.
+ #
+ def participant(name)
+
+ @context.plist.lookup(name, nil)
+ end
+
# Adds a service locally (will not get propagated to other workers).
#
# tracer = Tracer.new
# @engine.add_service('tracer', tracer)
#
@@ -587,11 +644,11 @@
#
# @engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')
#
# This method returns the service instance it just bound.
#
- def add_service (name, path_or_instance, classname=nil, opts=nil)
+ def add_service(name, path_or_instance, classname=nil, opts=nil)
@context.add_service(name, path_or_instance, classname, opts)
end
# Sets a configuration option. Examples:
@@ -601,60 +658,142 @@
# @engine.configure('remote_definition_allowed', true)
#
# # allow ruby_eval
# @engine.configure('ruby_eval_allowed', true)
#
- def configure (config_key, value)
+ def configure(config_key, value)
@context[config_key] = value
end
- # A convenience methods for advanced users (like Oleg).
+ # Returns a configuration value.
#
- # Given a fei (flow expression id), fetches the workitem as stored in
- # the expression with that fei.
- # This is the "applied workitem", if the workitem is currently handed to
- # a participant, this method will return the workitem as applied, not
- # the workitem as saved by the participant/user in whatever worklist it
- # uses. If you need that workitem, do the vanilla thing and ask it to
- # the [storage] participant or its worklist.
+ # engine.configure('ruby_eval_allowed', true)
#
- # The fei might be a string fei (result of fei.to_storage_id), a
- # FlowExpressionId instance or a hash.
+ # p engine.configuration('ruby_eval_allowed')
+ # # => true
#
- def workitem (fei)
+ def configuration(config_key)
- fexp = Ruote::Exp::FlowExpression.fetch(
- @context, Ruote::FlowExpressionId.extract_h(fei))
+ @context[config_key]
+ end
- Ruote::Workitem.new(fexp.h.applied_workitem)
+ # Returns the process tree that is triggered in case of error.
+ #
+ # Note that this 'on_error' doesn't trigger if an on_error is defined
+ # in the process itself.
+ #
+ # Returns nil if there is no 'on_error' set.
+ #
+ def on_error
+
+ @context.storage.get_trackers['trackers']['on_error']['msg']['tree']
+
+ rescue
+ nil
end
+ # Returns the process tree that is triggered in case of process termination.
+ #
+ # Note that a termination process doesn't raise a termination process when
+ # it terminates itself.
+ #
+ # Returns nil if there is no 'on_terminate' set.
+ #
+ def on_terminate
+
+ @context.storage.get_trackers['trackers']['on_terminate']['msg']['tree']
+
+ rescue
+ nil
+ end
+
+ # Sets a participant or subprocess to be triggered when an error occurs
+ # in a process instance.
+ #
+ # engine.on_error = participant_name
+ #
+ # engine.on_error = subprocess_name
+ #
+ # engine.on_error = Ruote.process_definition do
+ # alpha
+ # end
+ #
+ # Note that this 'on_error' doesn't trigger if an on_error is defined
+ # in the process itself.
+ #
+ def on_error=(target)
+
+ @context.tracker.add_tracker(
+ nil, # do not track a specific wfid
+ 'error_intercepted', # react on 'error_intercepted' msgs
+ 'on_error', # the identifier
+ nil, # no specific condition
+ { 'action' => 'launch',
+ 'wfid' => 'replace',
+ 'tree' => target.is_a?(String) ?
+ [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
+ 'workitem' => 'replace',
+ 'variables' => 'compile' })
+ end
+
+ # Sets a participant or a subprocess that is to be launched/called whenever
+ # a regular process terminates.
+ #
+ # engine.on_terminate = participant_name
+ #
+ # engine.on_terminate = subprocess_name
+ #
+ # engine.on_terminate = Ruote.define do
+ # alpha
+ # bravo
+ # end
+ #
+ # Note that a termination process doesn't raise a termination process when
+ # it terminates itself.
+ #
+ # on_terminate processes are not triggered for on_error processes.
+ # on_error processes are triggered for on_terminate processes as well.
+ #
+ def on_terminate=(target)
+
+ @context.tracker.add_tracker(
+ nil, # do not track a specific wfid
+ 'terminated', # react on 'error_intercepted' msgs
+ 'on_terminate', # the identifier
+ nil, # no specific condition
+ { 'action' => 'launch',
+ 'tree' => target.is_a?(String) ?
+ [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
+ 'workitem' => 'replace' })
+ end
+
# A debug helper :
#
# engine.noisy = true
#
# will let the engine (in fact the worker) pour all the details of the
# executing process instances to STDOUT.
#
- def noisy= (b)
+ def noisy=(b)
@context.logger.noisy = b
end
protected
# Used by #process and #processes
#
- def list_processes (wfids, opts)
+ def statuses(wfids, opts)
- swfids = wfids ? wfids.collect { |wfid| /!#{wfid}-\d+$/ } : nil
+ swfids = wfids.collect { |wfid| /!#{wfid}-\d+$/ }
- exps = @context.storage.get_many('expressions', wfids)
- swis = @context.storage.get_many('workitems', wfids)
- errs = @context.storage.get_many('errors', wfids)
- schs = @context.storage.get_many('schedules', swfids)
+ exps = @context.storage.get_many('expressions', wfids).compact
+ swis = @context.storage.get_many('workitems', wfids).compact
+ errs = @context.storage.get_many('errors', wfids).compact
+ schs = @context.storage.get_many('schedules', swfids).compact
+ # some slow storages need the compaction... couch...
errs = errs.collect { |err| ProcessError.new(err) }
schs = schs.collect { |sch| Ruote.schedule_to_h(sch) }
by_wfid = {}
@@ -670,17 +809,13 @@
end
schs.each do |sch|
(by_wfid[sch['wfid']] ||= [ [], [], [], [] ])[3] << sch
end
- wfids = if wfids
- wfids
- else
- wfids = by_wfid.keys.sort
- wfids = wfids.reverse if opts[:descending]
- wfids
- end
+ wfids = by_wfid.keys.sort
+ wfids = wfids.reverse if opts[:descending]
+ # re-adjust list of wfids, only take what was found
wfids.inject([]) { |a, wfid|
info = by_wfid[wfid]
a << ProcessStatus.new(@context, *info) if info
a
@@ -694,21 +829,21 @@
# There is one instance of this class for an Engine instance. It is
# returned when calling Engine#variables.
#
class EngineVariables
- def initialize (storage)
+ def initialize(storage)
@storage = storage
end
- def [] (k)
+ def [](k)
@storage.get_engine_variable(k)
end
- def []= (k, v)
+ def []=(k, v)
@storage.put_engine_variable(k, v)
end
end
@@ -717,39 +852,39 @@
#
# Originally written by Torsten Schoenebaum for ruote-kit.
#
class ParticipantRegistrationProxy
- def initialize (engine)
+ def initialize(engine)
@engine = engine
end
- def participant (name, klass, options={})
+ def participant(name, klass=nil, options={}, &block)
- @engine.register_participant(name, klass, options)
+ @engine.register_participant(name, klass, options, &block)
end
- def catchall (*args)
+ def catchall(*args)
klass = args.empty? ? Ruote::StorageParticipant : args.first
options = args[1] || {}
participant('.+', klass, options)
end
# Maybe a bit audacious...
#
- def method_missing (method_name, *args)
+ def method_missing(method_name, *args)
participant(method_name, *args)
end
end
# Refines a schedule as found in the ruote storage into something a bit
# easier to present.
#
- def self.schedule_to_h (sched)
+ def self.schedule_to_h(sched)
h = sched.dup
h.delete('_rev')
h.delete('type')