lib/ruote/dashboard.rb in ruote-2.3.0.1 vs lib/ruote/dashboard.rb in ruote-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -23,10 +23,11 @@
#++
require 'ruote/context'
require 'ruote/util/ometa'
require 'ruote/receiver/base'
+require 'ruote/dboard/mutation'
require 'ruote/dboard/process_status'
module Ruote
@@ -176,10 +177,52 @@
'stash' => root_stash)
wfid
end
+ # Given a flow expression id, locates the corresponding ruote
+ # expression and attaches a subprocess to it.
+ #
+ # Accepts the fei as a Hash or as a FlowExpressionId instance.
+ #
+ # By default, the workitem of the expression you attach to provides
+ # the initial workitem for the attached branch. By using the
+ # :fields/:workitem or :merge_fields options, one can change that.
+ #
+ # Returns the fei of the attached [root] expression
+ # (as a FlowExpressionId instance).
+ #
+ def attach(fei_or_fe, definition, opts={})
+
+ fe = Ruote.extract_fexp(@context, fei_or_fe).h
+ fei = fe['fei']
+
+ cfei = fei.merge(
+ 'expid' => "#{fei['expid']}_0",
+ 'subid' => Ruote.generate_subid(fei.inspect))
+
+ tree = @context.reader.read(definition)
+ tree[0] = 'sequence'
+
+ fields = fe['applied_workitem']['fields']
+ if fs = opts[:fields] || opts[:workitem]
+ fields = fs
+ elsif fs = opts[:merge_fields]
+ fields.merge!(fs)
+ end
+
+ @context.storage.put_msg(
+ 'launch', # "apply" is OK, but "launch" stands out better
+ 'parent_id' => fei,
+ 'fei' => cfei,
+ 'tree' => tree,
+ 'workitem' => { 'fields' => fields },
+ 'attached' => true)
+
+ Ruote::FlowExpressionId.new(cfei)
+ end
+
# Given a workitem or a fei, will do a cancel_expression,
# else it's a wfid and it does a cancel_process.
#
# == A note about opts
#
@@ -323,10 +366,15 @@
# :fields is used to replace the fields of the workitem at re_apply
#
# dashboard.re_apply(
# fei, :fields => { 'customer' => 'bob' })
#
+ # :workitem is ok too
+ #
+ # dashboard.re_apply(
+ # fei, :workitem => { 'fields' => { 'customer' => 'bob' } })
+ #
# :merge_in_fields is used to add / override fields
#
# dashboard.re_apply(
# fei, :merge_in_fields => { 'customer' => 'bob' })
#
@@ -336,10 +384,32 @@
'cancel',
'fei' => FlowExpressionId.extract_h(fei),
're_apply' => Ruote.keys_to_s(opts))
end
+ # Returns a Mutation instance listing all the operations necessary
+ # to transform the current tree of the process (wfid) into
+ # the given definition tree (pdef).
+ #
+ # See also #apply_mutation
+ #
+ def compute_mutation(wfid, pdef)
+
+ Mutation.new(self, wfid, @context.reader.read(pdef))
+ end
+
+ # Computes mutation and immediately applies it...
+ #
+ # See #compute_mutation
+ #
+ # Return the mutation instance (forensic?)
+ #
+ def apply_mutation(wfid, pdef)
+
+ Mutation.new(self, wfid, @context.reader.read(pdef)).apply
+ end
+
# This method re_apply all the leaves of a process instance. It's meant
# to be used against stalled workflows to give them back the spark of
# life.
#
# Stalled workflows can happen when msgs get lost. It also happens with
@@ -674,15 +744,15 @@
#
# class MyParticipant
# def initialize(opts)
# @name = opts['name']
# end
- # def consume(workitem)
+ # def on_workitem
# workitem.fields['rocket_name'] = @name
# send_to_the_moon(workitem)
# end
- # def cancel(fei, flavour)
+ # def on_cancel
# # do nothing
# end
# end
#
# dashboard.register_participant(
@@ -691,16 +761,19 @@
# # computing the total for a invoice being passed in the workitem.
# #
# class TotalParticipant
# include Ruote::LocalParticipant
#
- # def consume(workitem)
+ # def on_workitem
# workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item|
# t + item['count'] * PricingService.lookup(item['id'])
# }
- # reply_to_engine(workitem)
+ # reply
# end
+ #
+ # def on_cancel
+ # end
# end
# dashboard.register_participant 'total', TotalParticipant
#
# Remember that the options (the hash that follows the class name), must be
# serializable via JSON.
@@ -1075,30 +1148,113 @@
# on_terminate processes are not triggered for on_error processes.
# on_error processes are triggered for on_terminate processes as well.
#
def on_terminate=(target)
+ msg = {
+ 'action' => 'launch',
+ 'tree' => target.is_a?(String) ?
+ [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
+ 'workitem' => 'replace' }
+
@context.tracker.add_tracker(
- nil, # do not track a specific wfid
- 'terminated', # react on 'error_intercepted' msgs
- 'on_terminate', # the identifier
- nil, # no specific condition
- { 'action' => 'launch',
- 'tree' => target.is_a?(String) ?
- [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
- 'workitem' => 'replace' })
+ nil, # do not track a specific wfid
+ 'terminated', # react on 'error_intercepted' msgs
+ 'on_terminate', # the identifier
+ nil, # no specific condition
+ msg) # the message that gets triggered
end
+ # /!\ warning: advanced method.
+ #
+ # Adds a tracker to the ruote engine.
+ #
+ # === Arguments
+ #
+ # * wfid:
+ # When nil will track any workflow execution, when set will only
+ # react on msgs for the given wfid.
+ # * action:
+ # A string like "apply", "reply" or "receive", the action being tracked
+ # May begin with a "pre_" prefix.
+ # * tracker_id:
+ # When nil, ruote chooses a tracker_id, else its the unique identifier
+ # for the new tracker.
+ # * conditions:
+ # A Hash of keys pointing to arrays of expected values.
+ # For example { 'tree.0' ~=> [ 'alfred', 'knuth' ] } will trigger
+ # if the first element of msg['tree'] equals alfred or knuth.
+ # * msg:
+ # The msg to place in the msg queue if the tracker matches the msg,
+ # the reaction.
+ #
+ # Returns the tracker_id.
+ #
+ def add_tracker(wfid, action, tracker_id, conditions, msg)
+
+ @context.tracker.add_tracker(wfid, action, tracker_id, conditions, msg)
+ end
+
+ # /!\ warning: advanced method.
+ #
+ # Removes a tracker from the ruote system.
+ #
+ # The first arg is a FlowExpressionId, in its instance form, hash form or
+ # shortened (sid) string form. It can also be any string (any tracker id).
+ #
+ # The second arg is optional, it's a wfid. It's useful for some storage
+ # implementations (like ruote-swf) and helps determine how to grab
+ # the tracker list. Most of the ruote deployments don't need that arg set.
+ #
+ def remove_tracker(fei_sid_or_id, wfid=nil)
+
+ @context.tracker.remove_tracker(fei_sid_or_id, wfid)
+ end
+
+ # Returns a hash { tracker_id => tracker_hash } enumerating all
+ # the trackers in the ruote system.
+ #
+ def get_trackers(wfid=nil)
+
+ @context.storage.get_trackers(wfid)['trackers']
+ end
+
# A debug helper :
#
# dashboard.noisy = true
#
# will let the dashboard (in fact the worker) pour all the details of the
# executing process instances to STDOUT.
#
def noisy=(b)
@context.logger.noisy = b
+ end
+
+ # Warning: advanced method.
+ #
+ # Currently only used by mutations.
+ #
+ def update_expression(fei, opts)
+
+ fei = Ruote.extract_fei(fei)
+ fexp = Ruote::Exp::FlowExpression::fetch(@context, fei)
+
+ raise ArgumentError.new(
+ "no expression found with fei #{fei.sid}"
+ ) unless fexp
+
+ if t = opts[:tree]
+ fexp.h.updated_tree = opts[:tree]
+ end
+
+ r = @context.storage.put(fexp.h)
+
+ raise ArgumentError.new(
+ "expression #{fei.sid} is gone"
+ ) if r == true
+
+ return update_expression(fei, opts) unless r.nil?
end
protected
# Used by #pause and #resume.