lib/ruote/dashboard.rb in ruote-2.3.0.1 vs lib/ruote/dashboard.rb in ruote-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -23,10 +23,11 @@ #++ require 'ruote/context' require 'ruote/util/ometa' require 'ruote/receiver/base' +require 'ruote/dboard/mutation' require 'ruote/dboard/process_status' module Ruote @@ -176,10 +177,52 @@ 'stash' => root_stash) wfid end + # Given a flow expression id, locates the corresponding ruote + # expression and attaches a subprocess to it. + # + # Accepts the fei as a Hash or as a FlowExpressionId instance. + # + # By default, the workitem of the expression you attach to provides + # the initial workitem for the attached branch. By using the + # :fields/:workitem or :merge_fields options, one can change that. + # + # Returns the fei of the attached [root] expression + # (as a FlowExpressionId instance). + # + def attach(fei_or_fe, definition, opts={}) + + fe = Ruote.extract_fexp(@context, fei_or_fe).h + fei = fe['fei'] + + cfei = fei.merge( + 'expid' => "#{fei['expid']}_0", + 'subid' => Ruote.generate_subid(fei.inspect)) + + tree = @context.reader.read(definition) + tree[0] = 'sequence' + + fields = fe['applied_workitem']['fields'] + if fs = opts[:fields] || opts[:workitem] + fields = fs + elsif fs = opts[:merge_fields] + fields.merge!(fs) + end + + @context.storage.put_msg( + 'launch', # "apply" is OK, but "launch" stands out better + 'parent_id' => fei, + 'fei' => cfei, + 'tree' => tree, + 'workitem' => { 'fields' => fields }, + 'attached' => true) + + Ruote::FlowExpressionId.new(cfei) + end + # Given a workitem or a fei, will do a cancel_expression, # else it's a wfid and it does a cancel_process. # # == A note about opts # @@ -323,10 +366,15 @@ # :fields is used to replace the fields of the workitem at re_apply # # dashboard.re_apply( # fei, :fields => { 'customer' => 'bob' }) # + # :workitem is ok too + # + # dashboard.re_apply( + # fei, :workitem => { 'fields' => { 'customer' => 'bob' } }) + # # :merge_in_fields is used to add / override fields # # dashboard.re_apply( # fei, :merge_in_fields => { 'customer' => 'bob' }) # @@ -336,10 +384,32 @@ 'cancel', 'fei' => FlowExpressionId.extract_h(fei), 're_apply' => Ruote.keys_to_s(opts)) end + # Returns a Mutation instance listing all the operations necessary + # to transform the current tree of the process (wfid) into + # the given definition tree (pdef). + # + # See also #apply_mutation + # + def compute_mutation(wfid, pdef) + + Mutation.new(self, wfid, @context.reader.read(pdef)) + end + + # Computes mutation and immediately applies it... + # + # See #compute_mutation + # + # Return the mutation instance (forensic?) + # + def apply_mutation(wfid, pdef) + + Mutation.new(self, wfid, @context.reader.read(pdef)).apply + end + # This method re_apply all the leaves of a process instance. It's meant # to be used against stalled workflows to give them back the spark of # life. # # Stalled workflows can happen when msgs get lost. It also happens with @@ -674,15 +744,15 @@ # # class MyParticipant # def initialize(opts) # @name = opts['name'] # end - # def consume(workitem) + # def on_workitem # workitem.fields['rocket_name'] = @name # send_to_the_moon(workitem) # end - # def cancel(fei, flavour) + # def on_cancel # # do nothing # end # end # # dashboard.register_participant( @@ -691,16 +761,19 @@ # # computing the total for a invoice being passed in the workitem. # # # class TotalParticipant # include Ruote::LocalParticipant # - # def consume(workitem) + # def on_workitem # workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item| # t + item['count'] * PricingService.lookup(item['id']) # } - # reply_to_engine(workitem) + # reply # end + # + # def on_cancel + # end # end # dashboard.register_participant 'total', TotalParticipant # # Remember that the options (the hash that follows the class name), must be # serializable via JSON. @@ -1075,30 +1148,113 @@ # on_terminate processes are not triggered for on_error processes. # on_error processes are triggered for on_terminate processes as well. # def on_terminate=(target) + msg = { + 'action' => 'launch', + 'tree' => target.is_a?(String) ? + [ 'define', {}, [ [ target, {}, [] ] ] ] : target, + 'workitem' => 'replace' } + @context.tracker.add_tracker( - nil, # do not track a specific wfid - 'terminated', # react on 'error_intercepted' msgs - 'on_terminate', # the identifier - nil, # no specific condition - { 'action' => 'launch', - 'tree' => target.is_a?(String) ? - [ 'define', {}, [ [ target, {}, [] ] ] ] : target, - 'workitem' => 'replace' }) + nil, # do not track a specific wfid + 'terminated', # react on 'error_intercepted' msgs + 'on_terminate', # the identifier + nil, # no specific condition + msg) # the message that gets triggered end + # /!\ warning: advanced method. + # + # Adds a tracker to the ruote engine. + # + # === Arguments + # + # * wfid: + # When nil will track any workflow execution, when set will only + # react on msgs for the given wfid. + # * action: + # A string like "apply", "reply" or "receive", the action being tracked + # May begin with a "pre_" prefix. + # * tracker_id: + # When nil, ruote chooses a tracker_id, else its the unique identifier + # for the new tracker. + # * conditions: + # A Hash of keys pointing to arrays of expected values. + # For example { 'tree.0' ~=> [ 'alfred', 'knuth' ] } will trigger + # if the first element of msg['tree'] equals alfred or knuth. + # * msg: + # The msg to place in the msg queue if the tracker matches the msg, + # the reaction. + # + # Returns the tracker_id. + # + def add_tracker(wfid, action, tracker_id, conditions, msg) + + @context.tracker.add_tracker(wfid, action, tracker_id, conditions, msg) + end + + # /!\ warning: advanced method. + # + # Removes a tracker from the ruote system. + # + # The first arg is a FlowExpressionId, in its instance form, hash form or + # shortened (sid) string form. It can also be any string (any tracker id). + # + # The second arg is optional, it's a wfid. It's useful for some storage + # implementations (like ruote-swf) and helps determine how to grab + # the tracker list. Most of the ruote deployments don't need that arg set. + # + def remove_tracker(fei_sid_or_id, wfid=nil) + + @context.tracker.remove_tracker(fei_sid_or_id, wfid) + end + + # Returns a hash { tracker_id => tracker_hash } enumerating all + # the trackers in the ruote system. + # + def get_trackers(wfid=nil) + + @context.storage.get_trackers(wfid)['trackers'] + end + # A debug helper : # # dashboard.noisy = true # # will let the dashboard (in fact the worker) pour all the details of the # executing process instances to STDOUT. # def noisy=(b) @context.logger.noisy = b + end + + # Warning: advanced method. + # + # Currently only used by mutations. + # + def update_expression(fei, opts) + + fei = Ruote.extract_fei(fei) + fexp = Ruote::Exp::FlowExpression::fetch(@context, fei) + + raise ArgumentError.new( + "no expression found with fei #{fei.sid}" + ) unless fexp + + if t = opts[:tree] + fexp.h.updated_tree = opts[:tree] + end + + r = @context.storage.put(fexp.h) + + raise ArgumentError.new( + "expression #{fei.sid} is gone" + ) if r == true + + return update_expression(fei, opts) unless r.nil? end protected # Used by #pause and #resume.