lib/ruote/engine.rb in ruote-2.1.10 vs lib/ruote/engine.rb in ruote-2.1.11

- old
+ new

@@ -34,56 +34,157 @@ # a better name. Anyway, the methods here allow to launch processes # and to query about their status. There are also methods for fixing # issues with stalled processes or processes stuck in errors. # # NOTE : the methods #launch and #reply are implemented in - # Ruote::ReceiverMixin + # Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver). # class Engine include ReceiverMixin attr_reader :context attr_reader :variables - def initialize (worker_or_storage, run=true) + # Creates an engine using either worker or storage. + # + # If a storage instance is given as the first argument, the engine will be + # able to manage processes (for example, launch and cancel workflows) but + # will not actually run any workflows. + # + # If a worker instance is given as the first argument and the second + # argument is true, engine will start the worker and will be able to both + # manage and run workflows. + # + # If the second options is set to { :join => true }, the worker wil + # be started and run in the current thread. + # + def initialize (worker_or_storage, opts=true) @context = worker_or_storage.context @context.engine = self @variables = EngineVariables.new(@context.storage) - @context.worker.run_in_thread if @context.worker && run - # launch the worker if there is one + if @context.worker + if opts == true + @context.worker.run_in_thread + # runs worker in its own thread + elsif opts == { :join => true } + @context.worker.run + # runs worker in current thread (and doesn't return) + #else + # worker is not run + end + #else + # no worker + end end + # Returns the storage this engine works with passed at engine + # initialization. + # def storage @context.storage end + # Returns the worker nested inside this engine (passed at initialization). + # Returns nil if this engine is only linked to a storage (and the worker + # is running somewhere else (hopefully)). + # def worker @context.worker end + # Quick note : the implementation of launch is found in the module + # Ruote::ReceiverMixin that the engine includes. + # + # Some processes have to have one and only one instance of themselves + # running, these are called 'singles' ('singleton' is too object-oriented). + # + # When called, this method will check if an instance of the pdef is + # already running (it uses the process definition name attribute), if + # yes, it will return without having launched anything. If there is no + # such process running, it will launch it (and register it). + # + # Returns the wfid (workflow instance id) of the running single. + # + def launch_single (process_definition, fields={}, variables={}) + + tree = @context.parser.parse(process_definition) + name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first + + raise ArgumentError.new( + 'process definition is missing a name, cannot launch as single' + ) unless name + + singles = @context.storage.get('variables', 'singles') || { + '_id' => 'singles', 'type' => 'variables', 'h' => {} + } + wfid, timestamp = singles['h'][name] + + if wfid && (timestamp + 1.0 < Time.now.to_f || process(wfid) != nil) + return wfid + end + # process is already running + + wfid = @context.wfidgen.generate + + singles['h'][name] = [ wfid, Time.now.to_f ] + + r = @context.storage.put(singles) + + return launch_single(tree, fields, variables) unless r.nil? + # + # the put failed, back to the start... + # + # all this to prevent races between multiple engines, + # multiple launch_single calls (from different Ruby runtimes) + + # ... green for launch + + @context.storage.put_msg( + 'launch', + 'wfid' => wfid, + 'tree' => tree, + 'workitem' => { 'fields' => fields }, + 'variables' => variables) + + wfid + end + + # Given a process identifier (wfid), cancels this process. + # def cancel_process (wfid) @context.storage.put_msg('cancel_process', 'wfid' => wfid) end + # Given a process identifier (wfid), kills this process. Killing is + # equivalent to cancelling, but when killing, :on_cancel attributes + # are not triggered. + # def kill_process (wfid) @context.storage.put_msg('kill_process', 'wfid' => wfid) end + # Cancels a segment of process instance. Since expressions are nodes in + # processes instances, cancelling an expression, will cancel the expression + # and all its children (the segment of process). + # def cancel_expression (fei) fei = fei.to_h if fei.respond_to?(:to_h) @context.storage.put_msg('cancel', 'fei' => fei) end + # Like #cancel_expression, but :on_cancel attributes (of the expressions) + # are not triggered. + # def kill_expression (fei) fei = fei.to_h if fei.respond_to?(:to_h) @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill') end @@ -140,58 +241,115 @@ # Returns a ProcessStatus instance describing the current status of # a process instance. # def process (wfid) - exps = @context.storage.get_many('expressions', /!#{wfid}$/) - errs = self.errors( wfid ) - - return nil if exps.empty? && errs.empty? - - ProcessStatus.new(@context, exps, errs) + list_processes([ wfid ], {}).first end # Returns an array of ProcessStatus instances. # - # WARNING : this is an expensive operation. + # WARNING : this is an expensive operation, but it understands :skip + # and :limit, so pagination is our friend. # - def processes + # Please note, if you're interested only in processes that have errors, + # Engine#errors is a more efficient means. + # + # To simply list the wfids of the currently running, Engine#process_wfids + # is way cheaper to call. + # + def processes (opts={}) - exps = @context.storage.get_many('expressions') - errs = self.errors + wfids = nil - by_wfid = {} + if opts.size > 0 - exps.each do |exp| - (by_wfid[exp['fei']['wfid']] ||= [ [], [] ]).first << exp + wfids = @context.storage.expression_wfids(opts) + + return wfids.size if opts[:count] end - errs.each do |err| - (by_wfid[err['msg']['fei']['wfid']] ||= [ [], [] ]).last << err - end - by_wfid.values.collect { |xs, rs| ProcessStatus.new(@context, xs, rs) } + list_processes(wfids, opts) end # Returns an array of current errors (hashes) # - def errors( wfid = nil ) - wfid.nil? ? - @context.storage.get_many('errors') : - @context.storage.get_many('errors', /!#{wfid}$/) + # Can be called in two ways : + # + # engine.errors(wfid) + # + # and + # + # engine.errors(:skip => 100, :limit => 100) + # + def errors (wfid=nil) + + wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] + + errs = wfid.nil? ? + @context.storage.get_many('errors', nil, options) : + @context.storage.get_many('errors', wfid) + + return errs if options[:count] + + errs.collect { |err| ProcessError.new(err) } end + # Returns an array of schedules. Those schedules are open structs + # with various properties, like target, owner, at, put_at, ... + # + # Introduced mostly for ruote-kit. + # + # Can be called in two ways : + # + # engine.schedules(wfid) + # + # and + # + # engine.schedules(:skip => 100, :limit => 100) + # + def schedules (wfid=nil) + + wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] + + scheds = wfid.nil? ? + @context.storage.get_many('schedules', nil, options) : + @context.storage.get_many('schedules', /!#{wfid}-\d+$/) + + return scheds if options[:count] + + scheds.collect { |sched| Ruote.schedule_to_h(sched) } + end + + # Returns a [sorted] list of wfids of the process instances currently + # running in the engine. + # + # This operation is substantially less costly than Engine#processes (though + # the 'how substantially' depends on the storage chosen). + # + def process_wfids + + @context.storage.ids('expressions').collect { |sfei| + sfei.split('!').last + }.uniq.sort + end + # Shuts down the engine, mostly passes the shutdown message to the other # services and hope they'll shut down properly. # def shutdown @context.shutdown end - # This method expects there is a logger with a wait_for method in the + # This method expects there to be a logger with a wait_for method in the # context, else it will raise an exception. # + # *WARNING* : wait_for() is meant for environments where there is a unique + # worker and that worker is nested in this engine. In a multiple worker + # environment wait_for doesn't see events handled by 'other' workers. + # # This method is only useful for test/quickstart/examples environments. # # engine.wait_for(:alpha) # # will make the current thread block until a workitem is delivered # # to the participant named 'alpha' @@ -222,10 +380,18 @@ ) unless logger.respond_to?(:wait_for) logger.wait_for(items) end + # Joins the worker thread. If this engine has no nested worker, calling + # this method will simply return immediately. + # + def join + + worker.join if worker + end + # Loads and parses the process definition at the given path. # def load_definition (path) @context.parser.parse(path) @@ -291,15 +457,31 @@ # def cancel (fei, flavour) # # do nothing # end # end # - # engine.register_participant 'moon', MyStatelessParticipant, 'name' => 'saturn5' + # engine.register_participant( + # 'moon', MyStatelessParticipant, 'name' => 'saturn5') # # Remember that the options (the hash that follows the class name), must be # serialisable via JSON. # + # + # == require_path and load_path + # + # It's OK to register a participant by passing its full classname as a + # String. + # + # engine.register_participant( + # 'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb') + # engine.register_participant( + # 'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb') + # + # Note the option load_path / require_path that point to the ruby file + # containing the participant implementation. 'require' will load and eval + # the ruby code only once, 'load' each time. + # def register_participant (regex, participant=nil, opts={}, &block) pa = @context.plist.register(regex, participant, opts, block) @context.storage.put_msg( @@ -308,10 +490,34 @@ 'engine_worker_only' => (pa != nil)) pa end + # A shorter version of #register_participant + # + # engine.register 'alice', MailParticipant, :target => 'alice@example.com' + # + # or a block registering mechanism. + # + # engine.register do + # alpha 'Participants::Alpha', 'flavour' => 'vanilla' + # participant 'bravo', 'Participants::Bravo', :flavour => 'peach' + # catchall ParticipantCharlie, 'flavour' => 'coconut' + # end + # + # Originally implemented in ruote-kit by Torsten Schoenebaum. + # + def register (*args, &block) + + if args.size > 0 + register_participant(*args, &block) + else + proxy = ParticipantRegistrationProxy.new(self) + block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy) + end + end + # Removes/unregisters a participant from the engine. # def unregister_participant (name_or_participant) re = @context.plist.unregister(name_or_participant) @@ -321,10 +527,59 @@ @context.storage.put_msg( 'participant_unregistered', 'regex' => re.to_s) end + alias :unregister :unregister_participant + + # Returns a list of Ruote::ParticipantEntry instances. + # + # engine.register_participant :alpha, MyParticipant, 'message' => 'hello' + # + # # interrogate participant list + # # + # list = engine.participant_list + # participant = list.first + # p participant.regex + # # => "^alpha$" + # p participant.classname + # # => "MyParticipant" + # p participant.options + # # => {"message"=>"hello"} + # + # # update participant list + # # + # participant.regex = '^alfred$' + # engine.participant_list = list + # + def participant_list + + @context.plist.list + end + + # Accepts a list of Ruote::ParticipantEntry instances. + # + # See Engine#participant_list + # + def participant_list= (pl) + + @context.plist.list = pl + end + + # A convenience method for + # + # sp = Ruote::StorageParticipant.new(engine) + # + # simply do + # + # sp = engine.storage_participant + # + def storage_participant + + @storage_participant ||= Ruote::StorageParticipant.new(self) + end + # Adds a service locally (will not get propagated to other workers). # # tracer = Tracer.new # @engine.add_service('tracer', tracer) # @@ -371,10 +626,68 @@ fexp = Ruote::Exp::FlowExpression.fetch( @context, Ruote::FlowExpressionId.extract_h(fei)) Ruote::Workitem.new(fexp.h.applied_workitem) end + + # A debug helper : + # + # engine.noisy = true + # + # will let the engine (in fact the worker) pour all the details of the + # executing process instances to STDOUT. + # + def noisy= (b) + + @context.logger.noisy = b + end + + protected + + # Used by #process and #processes + # + def list_processes (wfids, opts) + + swfids = wfids ? wfids.collect { |wfid| /!#{wfid}-\d+$/ } : nil + + exps = @context.storage.get_many('expressions', wfids) + swis = @context.storage.get_many('workitems', wfids) + errs = @context.storage.get_many('errors', wfids) + schs = @context.storage.get_many('schedules', swfids) + + errs = errs.collect { |err| ProcessError.new(err) } + schs = schs.collect { |sch| Ruote.schedule_to_h(sch) } + + by_wfid = {} + + exps.each do |exp| + (by_wfid[exp['fei']['wfid']] ||= [ [], [], [], [] ])[0] << exp + end + swis.each do |swi| + (by_wfid[swi['fei']['wfid']] ||= [ [], [], [], [] ])[1] << swi + end + errs.each do |err| + (by_wfid[err.wfid] ||= [ [], [], [], [] ])[2] << err + end + schs.each do |sch| + (by_wfid[sch['wfid']] ||= [ [], [], [], [] ])[3] << sch + end + + wfids = if wfids + wfids + else + wfids = by_wfid.keys.sort + wfids = wfids.reverse if opts[:descending] + wfids + end + + wfids.inject([]) { |a, wfid| + info = by_wfid[wfid] + a << ProcessStatus.new(@context, *info) if info + a + } + end end # # A wrapper class giving easy access to engine variables. # @@ -395,8 +708,62 @@ def []= (k, v) @storage.put_engine_variable(k, v) end + end + + # + # Engine#register uses this proxy when it's passed a block. + # + # Originally written by Torsten Schoenebaum for ruote-kit. + # + class ParticipantRegistrationProxy + + def initialize (engine) + + @engine = engine + end + + def participant (name, klass, options={}) + + @engine.register_participant(name, klass, options) + end + + def catchall (*args) + + klass = args.empty? ? Ruote::StorageParticipant : args.first + options = args[1] || {} + + participant('.+', klass, options) + end + + # Maybe a bit audacious... + # + def method_missing (method_name, *args) + + participant(method_name, *args) + end + end + + # Refines a schedule as found in the ruote storage into something a bit + # easier to present. + # + def self.schedule_to_h (sched) + + h = sched.dup + + h.delete('_rev') + h.delete('type') + msg = h.delete('msg') + owner = h.delete('owner') + + h['wfid'] = owner['wfid'] + h['action'] = msg['action'] + h['type'] = msg['flavour'] + h['owner'] = Ruote::FlowExpressionId.new(owner) + h['target'] = Ruote::FlowExpressionId.new(msg['fei']) + + h end end