lib/ruote/engine.rb in ruote-2.1.11 vs lib/ruote/engine.rb in ruote-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -56,11 +56,11 @@ # manage and run workflows. # # If the second options is set to { :join => true }, the worker wil # be started and run in the current thread. # - def initialize (worker_or_storage, opts=true) + def initialize(worker_or_storage, opts=true) @context = worker_or_storage.context @context.engine = self @variables = EngineVariables.new(@context.storage) @@ -95,10 +95,17 @@ def worker @context.worker end + # A shortcut for engine.context.history + # + def history + + @context.history + end + # Quick note : the implementation of launch is found in the module # Ruote::ReceiverMixin that the engine includes. # # Some processes have to have one and only one instance of themselves # running, these are called 'singles' ('singleton' is too object-oriented). @@ -108,13 +115,13 @@ # yes, it will return without having launched anything. If there is no # such process running, it will launch it (and register it). # # Returns the wfid (workflow instance id) of the running single. # - def launch_single (process_definition, fields={}, variables={}) + def launch_single(process_definition, fields={}, variables={}) - tree = @context.parser.parse(process_definition) + tree = @context.reader.read(process_definition) name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first raise ArgumentError.new( 'process definition is missing a name, cannot launch as single' ) unless name @@ -122,14 +129,12 @@ singles = @context.storage.get('variables', 'singles') || { '_id' => 'singles', 'type' => 'variables', 'h' => {} } wfid, timestamp = singles['h'][name] - if wfid && (timestamp + 1.0 < Time.now.to_f || process(wfid) != nil) - return wfid - end - # process is already running + return wfid if wfid && (ps(wfid) || Time.now.to_f - timestamp < 1.0) + # return wfid if 'singleton' process is already running wfid = @context.wfidgen.generate singles['h'][name] = [ wfid, Time.now.to_f ] @@ -152,49 +157,48 @@ 'variables' => variables) wfid end - # Given a process identifier (wfid), cancels this process. + # Given a workitem or a fei, will do a cancel_expression, + # else it's a wfid and it does a cancel_process. # - def cancel_process (wfid) + def cancel(wi_or_fei_or_wfid) - @context.storage.put_msg('cancel_process', 'wfid' => wfid) - end + target = Ruote.extract_id(wi_or_fei_or_wfid) - # Given a process identifier (wfid), kills this process. Killing is - # equivalent to cancelling, but when killing, :on_cancel attributes - # are not triggered. - # - def kill_process (wfid) - - @context.storage.put_msg('kill_process', 'wfid' => wfid) + if target.is_a?(String) + @context.storage.put_msg('cancel_process', 'wfid' => target) + else + @context.storage.put_msg('cancel', 'fei' => target) + end end - # Cancels a segment of process instance. Since expressions are nodes in - # processes instances, cancelling an expression, will cancel the expression - # and all its children (the segment of process). - # - def cancel_expression (fei) + alias cancel_process cancel + alias cancel_expression cancel - fei = fei.to_h if fei.respond_to?(:to_h) - @context.storage.put_msg('cancel', 'fei' => fei) - end - - # Like #cancel_expression, but :on_cancel attributes (of the expressions) - # are not triggered. + # Given a workitem or a fei, will do a kill_expression, + # else it's a wfid and it does a kill_process. # - def kill_expression (fei) + def kill(wi_or_fei_or_wfid) - fei = fei.to_h if fei.respond_to?(:to_h) - @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill') + target = Ruote.extract_id(wi_or_fei_or_wfid) + + if target.is_a?(String) + @context.storage.put_msg('kill_process', 'wfid' => target) + else + @context.storage.put_msg('cancel', 'fei' => target, 'flavour' => 'kill') + end end + alias kill_process kill + alias kill_expression kill + # Replays at a given error (hopefully you fixed the cause of the error # before replaying...) # - def replay_at_error (err) + def replay_at_error(err) msg = err.msg.dup action = msg.delete('action') msg['replay_at_error'] = true @@ -231,21 +235,21 @@ # # :merge_in_fields is used to add / override fields # # engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' }) # - def re_apply (fei, opts={}) + def re_apply(fei, opts={}) @context.storage.put_msg('cancel', 'fei' => fei.to_h, 're_apply' => opts) end # Returns a ProcessStatus instance describing the current status of # a process instance. # - def process (wfid) + def process(wfid) - list_processes([ wfid ], {}).first + statuses([ wfid ], {}).first end # Returns an array of ProcessStatus instances. # # WARNING : this is an expensive operation, but it understands :skip @@ -255,22 +259,23 @@ # Engine#errors is a more efficient means. # # To simply list the wfids of the currently running, Engine#process_wfids # is way cheaper to call. # - def processes (opts={}) + def processes(opts={}) - wfids = nil + wfids = @context.storage.expression_wfids(opts) - if opts.size > 0 + opts[:count] ? wfids.size : statuses(wfids, opts) + end - wfids = @context.storage.expression_wfids(opts) + # Returns a list of processes or the process status of a given process + # instance. + # + def ps(wfid=nil) - return wfids.size if opts[:count] - end - - list_processes(wfids, opts) + wfid == nil ? processes : process(wfid) end # Returns an array of current errors (hashes) # # Can be called in two ways : @@ -279,11 +284,11 @@ # # and # # engine.errors(:skip => 100, :limit => 100) # - def errors (wfid=nil) + def errors(wfid=nil) wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] errs = wfid.nil? ? @context.storage.get_many('errors', nil, options) : @@ -305,36 +310,62 @@ # # and # # engine.schedules(:skip => 100, :limit => 100) # - def schedules (wfid=nil) + def schedules(wfid=nil) wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] scheds = wfid.nil? ? @context.storage.get_many('schedules', nil, options) : @context.storage.get_many('schedules', /!#{wfid}-\d+$/) return scheds if options[:count] - scheds.collect { |sched| Ruote.schedule_to_h(sched) } + scheds.collect { |s| Ruote.schedule_to_h(s) }.sort_by { |s| s['wfid'] } end # Returns a [sorted] list of wfids of the process instances currently # running in the engine. # # This operation is substantially less costly than Engine#processes (though # the 'how substantially' depends on the storage chosen). # - def process_wfids + def process_ids - @context.storage.ids('expressions').collect { |sfei| - sfei.split('!').last - }.uniq.sort + @context.storage.expression_wfids({}) end + alias process_wfids process_ids + + # Warning : expensive operation. + # + # Leftovers are workitems, errors and schedules belonging to process + # instances for which there are no more expressions left. + # + # Better delete them or investigate why they are left here. + # + # The result is a list of documents (hashes) as found in the storage. Each + # of them might represent a workitem, an error or a schedule. + # + # If you want to delete one of them you can do + # + # engine.storage.delete(doc) + # + def leftovers + + wfids = @context.storage.expression_wfids({}) + + wis = @context.storage.get_many('workitems').compact + ers = @context.storage.get_many('errors').compact + scs = @context.storage.get_many('schedules').compact + # some slow storages need the compaction... [c]ouch... + + (wis + ers + scs).reject { |doc| wfids.include?(doc['fei']['wfid']) } + end + # Shuts down the engine, mostly passes the shutdown message to the other # services and hope they'll shut down properly. # def shutdown @@ -369,11 +400,11 @@ # # It's OK to wait for multiple wfids : # # engine.wait_for('20100612-bezerijozo', '20100612-yakisoba') # - def wait_for (*items) + def wait_for(*items) logger = @context['s_logger'] raise( "can't wait_for, there is no logger that responds to that call" @@ -388,84 +419,87 @@ def join worker.join if worker end - # Loads and parses the process definition at the given path. + # Loads (and turns into a tree) the process definition at the given path. # - def load_definition (path) + def load_definition(path) - @context.parser.parse(path) + @context.reader.read(path) end - # Registers a participant in the engine. Returns the participant instance. + # Registers a participant in the engine. # - # Some examples : + # Takes the form # - # require 'ruote/part/hash_participant' - # alice = engine.register_participant 'alice', Ruote::HashParticipant - # # register an in-memory (hash) store for Alice's workitems + # engine.register_participant name_or_regex, klass, opts={} # + # With the form + # + # engine.register_participant name_or_regex do |workitem| + # # ... + # end + # + # A BlockParticipant is automatically created. + # + # + # == name or regex + # + # When registering participants, strings or regexes are accepted. Behind + # the scenes, a regex is kept. + # + # Passing a string like "alain" will get ruote to automatically turn it + # into the following regex : /^alain$/. + # + # For finer control over this, pass a regex directly + # + # engine.register_participant /^user-/, MyParticipant + # # will match all workitems whose participant name starts with "user-" + # + # + # == some examples + # # engine.register_participant 'compute_sum' do |wi| # wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)| # s + c * v # sum + count * value # end # # a block participant implicitely replies to the engine immediately # end # # class MyParticipant - # def initialize (name) - # @name = name + # def initialize(opts) + # @name = opts['name'] # end - # def consume (workitem) + # def consume(workitem) # workitem.fields['rocket_name'] = @name # send_to_the_moon(workitem) # end - # def cancel (fei, flavour) + # def cancel(fei, flavour) # # do nothing # end # end - # engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V') # + # engine.register_participant( + # /^moon-.+/, MyParticipant, 'name' => 'Saturn-V') # - # == 'stateless' participants are preferred over 'stateful' ones + # # computing the total for a invoice being passed in the workitem. + # # + # class TotalParticipant + # include Ruote::LocalParticipant # - # Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be - # in other ruby runtimes. This implies that if you have registered a - # participant instance (instead of passing its classname and options), - # that participant will only run in the worker 'embedded' in the engine - # where it was registered... Let me rephrase it, participants instantiated - # at registration time (and that includes block participants) only runs - # in one worker, always the same. - # - # 'stateless' participants, instantiated at each dispatch, are preferred. - # Any worker can handle them. - # - # Block participants are still fine for demos (where the worker is included - # in the engine (see all the quickstarts). And small engines with 1 worker - # are not that bad, not everybody is building huge systems). - # - # Here is a 'stateless' participant example : - # - # class MyStatelessParticipant - # def initialize (opts) - # @opts = opts + # def consume(workitem) + # workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item| + # t + item['count'] * PricingService.lookup(item['id']) + # } + # reply_to_engine(workitem) # end - # def consume (workitem) - # workitem.fields['rocket_name'] = @opts['name'] - # send_to_the_moon(workitem) - # end - # def cancel (fei, flavour) - # # do nothing - # end # end + # engine.register_participant 'total', TotalParticipant # - # engine.register_participant( - # 'moon', MyStatelessParticipant, 'name' => 'saturn5') - # # Remember that the options (the hash that follows the class name), must be - # serialisable via JSON. + # serializable via JSON. # # # == require_path and load_path # # It's OK to register a participant by passing its full classname as a @@ -478,18 +512,22 @@ # # Note the option load_path / require_path that point to the ruby file # containing the participant implementation. 'require' will load and eval # the ruby code only once, 'load' each time. # - def register_participant (regex, participant=nil, opts={}, &block) + def register_participant(regex, participant=nil, opts={}, &block) + if participant.is_a?(Hash) + opts = participant + participant = nil + end + pa = @context.plist.register(regex, participant, opts, block) @context.storage.put_msg( 'participant_registered', - 'regex' => regex.to_s, - 'engine_worker_only' => (pa != nil)) + 'regex' => regex.is_a?(Regexp) ? regex.inspect : regex.to_s) pa end # A shorter version of #register_participant @@ -504,11 +542,11 @@ # catchall ParticipantCharlie, 'flavour' => 'coconut' # end # # Originally implemented in ruote-kit by Torsten Schoenebaum. # - def register (*args, &block) + def register(*args, &block) if args.size > 0 register_participant(*args, &block) else proxy = ParticipantRegistrationProxy.new(self) @@ -516,11 +554,11 @@ end end # Removes/unregisters a participant from the engine. # - def unregister_participant (name_or_participant) + def unregister_participant(name_or_participant) re = @context.plist.unregister(name_or_participant) raise(ArgumentError.new('participant not found')) unless re @@ -554,15 +592,26 @@ def participant_list @context.plist.list end - # Accepts a list of Ruote::ParticipantEntry instances. + # Accepts a list of Ruote::ParticipantEntry instances or a list of + # [ regex, [ classname, opts ] ] arrays. # # See Engine#participant_list # - def participant_list= (pl) + # Some examples : + # + # engine.participant_list = [ + # [ '^charly$', [ 'Ruote::StorageParticipant', {} ] ], + # [ '.+', [ 'MyDefaultParticipant', { 'default' => true } ] + # ] + # + # This method writes the participant list in one go, it might be easier to + # use than to register participant one by ones. + # + def participant_list=(pl) @context.plist.list = pl end # A convenience method for @@ -576,10 +625,18 @@ def storage_participant @storage_participant ||= Ruote::StorageParticipant.new(self) end + # Returns an instance of the participant registered under the given name. + # Returns nil if there is no participant registered for that name. + # + def participant(name) + + @context.plist.lookup(name, nil) + end + # Adds a service locally (will not get propagated to other workers). # # tracer = Tracer.new # @engine.add_service('tracer', tracer) # @@ -587,11 +644,11 @@ # # @engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer') # # This method returns the service instance it just bound. # - def add_service (name, path_or_instance, classname=nil, opts=nil) + def add_service(name, path_or_instance, classname=nil, opts=nil) @context.add_service(name, path_or_instance, classname, opts) end # Sets a configuration option. Examples: @@ -601,60 +658,142 @@ # @engine.configure('remote_definition_allowed', true) # # # allow ruby_eval # @engine.configure('ruby_eval_allowed', true) # - def configure (config_key, value) + def configure(config_key, value) @context[config_key] = value end - # A convenience methods for advanced users (like Oleg). + # Returns a configuration value. # - # Given a fei (flow expression id), fetches the workitem as stored in - # the expression with that fei. - # This is the "applied workitem", if the workitem is currently handed to - # a participant, this method will return the workitem as applied, not - # the workitem as saved by the participant/user in whatever worklist it - # uses. If you need that workitem, do the vanilla thing and ask it to - # the [storage] participant or its worklist. + # engine.configure('ruby_eval_allowed', true) # - # The fei might be a string fei (result of fei.to_storage_id), a - # FlowExpressionId instance or a hash. + # p engine.configuration('ruby_eval_allowed') + # # => true # - def workitem (fei) + def configuration(config_key) - fexp = Ruote::Exp::FlowExpression.fetch( - @context, Ruote::FlowExpressionId.extract_h(fei)) + @context[config_key] + end - Ruote::Workitem.new(fexp.h.applied_workitem) + # Returns the process tree that is triggered in case of error. + # + # Note that this 'on_error' doesn't trigger if an on_error is defined + # in the process itself. + # + # Returns nil if there is no 'on_error' set. + # + def on_error + + @context.storage.get_trackers['trackers']['on_error']['msg']['tree'] + + rescue + nil end + # Returns the process tree that is triggered in case of process termination. + # + # Note that a termination process doesn't raise a termination process when + # it terminates itself. + # + # Returns nil if there is no 'on_terminate' set. + # + def on_terminate + + @context.storage.get_trackers['trackers']['on_terminate']['msg']['tree'] + + rescue + nil + end + + # Sets a participant or subprocess to be triggered when an error occurs + # in a process instance. + # + # engine.on_error = participant_name + # + # engine.on_error = subprocess_name + # + # engine.on_error = Ruote.process_definition do + # alpha + # end + # + # Note that this 'on_error' doesn't trigger if an on_error is defined + # in the process itself. + # + def on_error=(target) + + @context.tracker.add_tracker( + nil, # do not track a specific wfid + 'error_intercepted', # react on 'error_intercepted' msgs + 'on_error', # the identifier + nil, # no specific condition + { 'action' => 'launch', + 'wfid' => 'replace', + 'tree' => target.is_a?(String) ? + [ 'define', {}, [ [ target, {}, [] ] ] ] : target, + 'workitem' => 'replace', + 'variables' => 'compile' }) + end + + # Sets a participant or a subprocess that is to be launched/called whenever + # a regular process terminates. + # + # engine.on_terminate = participant_name + # + # engine.on_terminate = subprocess_name + # + # engine.on_terminate = Ruote.define do + # alpha + # bravo + # end + # + # Note that a termination process doesn't raise a termination process when + # it terminates itself. + # + # on_terminate processes are not triggered for on_error processes. + # on_error processes are triggered for on_terminate processes as well. + # + def on_terminate=(target) + + @context.tracker.add_tracker( + nil, # do not track a specific wfid + 'terminated', # react on 'error_intercepted' msgs + 'on_terminate', # the identifier + nil, # no specific condition + { 'action' => 'launch', + 'tree' => target.is_a?(String) ? + [ 'define', {}, [ [ target, {}, [] ] ] ] : target, + 'workitem' => 'replace' }) + end + # A debug helper : # # engine.noisy = true # # will let the engine (in fact the worker) pour all the details of the # executing process instances to STDOUT. # - def noisy= (b) + def noisy=(b) @context.logger.noisy = b end protected # Used by #process and #processes # - def list_processes (wfids, opts) + def statuses(wfids, opts) - swfids = wfids ? wfids.collect { |wfid| /!#{wfid}-\d+$/ } : nil + swfids = wfids.collect { |wfid| /!#{wfid}-\d+$/ } - exps = @context.storage.get_many('expressions', wfids) - swis = @context.storage.get_many('workitems', wfids) - errs = @context.storage.get_many('errors', wfids) - schs = @context.storage.get_many('schedules', swfids) + exps = @context.storage.get_many('expressions', wfids).compact + swis = @context.storage.get_many('workitems', wfids).compact + errs = @context.storage.get_many('errors', wfids).compact + schs = @context.storage.get_many('schedules', swfids).compact + # some slow storages need the compaction... couch... errs = errs.collect { |err| ProcessError.new(err) } schs = schs.collect { |sch| Ruote.schedule_to_h(sch) } by_wfid = {} @@ -670,17 +809,13 @@ end schs.each do |sch| (by_wfid[sch['wfid']] ||= [ [], [], [], [] ])[3] << sch end - wfids = if wfids - wfids - else - wfids = by_wfid.keys.sort - wfids = wfids.reverse if opts[:descending] - wfids - end + wfids = by_wfid.keys.sort + wfids = wfids.reverse if opts[:descending] + # re-adjust list of wfids, only take what was found wfids.inject([]) { |a, wfid| info = by_wfid[wfid] a << ProcessStatus.new(@context, *info) if info a @@ -694,21 +829,21 @@ # There is one instance of this class for an Engine instance. It is # returned when calling Engine#variables. # class EngineVariables - def initialize (storage) + def initialize(storage) @storage = storage end - def [] (k) + def [](k) @storage.get_engine_variable(k) end - def []= (k, v) + def []=(k, v) @storage.put_engine_variable(k, v) end end @@ -717,39 +852,39 @@ # # Originally written by Torsten Schoenebaum for ruote-kit. # class ParticipantRegistrationProxy - def initialize (engine) + def initialize(engine) @engine = engine end - def participant (name, klass, options={}) + def participant(name, klass=nil, options={}, &block) - @engine.register_participant(name, klass, options) + @engine.register_participant(name, klass, options, &block) end - def catchall (*args) + def catchall(*args) klass = args.empty? ? Ruote::StorageParticipant : args.first options = args[1] || {} participant('.+', klass, options) end # Maybe a bit audacious... # - def method_missing (method_name, *args) + def method_missing(method_name, *args) participant(method_name, *args) end end # Refines a schedule as found in the ruote storage into something a bit # easier to present. # - def self.schedule_to_h (sched) + def self.schedule_to_h(sched) h = sched.dup h.delete('_rev') h.delete('type')