lib/ruote/receiver/base.rb in ruote-2.2.0 vs lib/ruote/receiver/base.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -27,11 +27,11 @@
#
# The core methods for the Receiver class (sometimes a Mixin is easier
# to integrate).
#
- # (The engine itself includes this mixin, the LocalParticipant module
+ # (The dashboard itself includes this mixin, the LocalParticipant module
# includes it as well).
#
module ReceiverMixin
# This method pipes back a workitem into the engine, letting it resume
@@ -47,58 +47,103 @@
'workitem' => workitem,
'participant_name' => workitem['participant_name'],
'receiver' => sign)
end
+ # Wraps a call to receive(workitem)
+ #
+ # Not aliasing so that if someone changes the receive implementation,
+ # reply is affected as well.
+ #
+ def reply(workitem)
+
+ receive(workitem)
+ end
+
+ # Can be used to raise an error in the workflow instance.
+ #
+ # Can be called either with an error class and arguments, either with
+ # an error instance (and no arguments).
+ #
+ # The workitem can be either an instance of Ruote::Workitem or a workitem
+ # in its Hash representation.
+ #
+ # receiver.flunk(workitem, ArgumentError, "not enough info")
+ #
+ # rescue => e
+ # receiver.flunk(workitem, e)
+ # end
+ #
+ def flunk(workitem, error_class_or_instance_or_message, *err_arguments)
+
+ err = error_class_or_instance_or_message
+
+ if err.is_a?(String)
+ err = RuntimeError.new(err)
+ err.set_backtrace(caller)
+
+ elsif err.is_a?(Class)
+ err = err.new(*err_arguments)
+ err.set_backtrace(caller)
+ end
+
+ workitem = workitem.h if workitem.respond_to?(:h)
+
+ @context.storage.put_msg(
+ 'raise',
+ 'fei' => workitem['fei'],
+ 'wfid' => workitem['wfid'],
+ 'msg' => {
+ 'action' => 'dispatch',
+ 'fei' => workitem['fei'],
+ 'participant_name' => workitem['participant_name'],
+ 'participant' => nil,
+ 'workitem' => workitem
+ },
+ 'error' => {
+ 'class' => err.class.name,
+ 'message' => err.message,
+ 'trace' => err.backtrace
+ })
+ end
+
# Given a process definitions and optional initial fields and variables,
# launches a new process instance.
#
- # This method is mostly used from the Ruote::Engine class (which includes
- # this mixin).
+ # This method is mostly used from the Ruote::Dashboard class (which
+ # includes this mixin).
#
# process_definition must be a result of Ruote.process_definition call
# or XML or JSON serialized process definition, as accepted by
# Ruote::Reader#read.
#
# fields are workflow parameters that will be placed in workitem.fields.
#
- # variables contain engine variables.
+ # Calls to this method returns the newly launched "workflow instance id"
+ # ("wfid" for short), the [hopefully] unique identifier for the
+ # process instance.
#
- def launch(process_definition, fields={}, variables={})
+ # == custom :wfid
+ #
+ # When calling this method, it's OK to pass a field named :wfid (Symbol,
+ # not String) that will be used as the identifier for the process instance.
+ #
+ def launch(process_definition, fields={}, variables={}, root_stash=nil)
- wfid = @context.wfidgen.generate
+ wfid = fields[:wfid] || @context.wfidgen.generate
@context.storage.put_msg(
'launch',
'wfid' => wfid,
'tree' => @context.reader.read(process_definition),
'workitem' => { 'fields' => fields },
- 'variables' => variables)
+ 'variables' => variables,
+ 'stash' => root_stash)
wfid
end
- # Wraps a call to receive(workitem)
- #
- # Not aliasing so that if someone changes the receive implementation,
- # reply is affected as well.
- #
- def reply(workitem)
-
- receive(workitem)
- end
-
- # Wraps a call to receive(workitem)
- #
- # Not aliasing so that if someone changes the receive implementation,
- # reply_to_engine is affected as well.
- #
- def reply_to_engine(workitem)
-
- receive(workitem)
- end
-
# A receiver signs a workitem when it comes back.
#
# Not used much as of now.
#
def sign
@@ -114,17 +159,12 @@
Ruote::Exp::FlowExpression.fetch(
@context,
Ruote::FlowExpressionId.extract_h(workitem_or_fei))
end
- # For example :
- #
- # fexp = engine.fexp(fei)
- # # or
- # fexp = engine.fexp(workitem)
- #
alias fexp fetch_flow_expression
+ alias flow_expression fetch_flow_expression
# A convenience methods for advanced users (like Oleg).
#
# Given a fei (flow expression id), fetches the workitem as stored in
# the expression with that fei.
@@ -138,16 +178,18 @@
# FlowExpressionId instance or a hash.
#
# on_terminate processes are not triggered for on_error processes.
# on_error processes are triggered for on_terminate processes as well.
#
- def applied_workitem(fei)
+ def fetch_workitem(fexp_or_fei)
- Ruote::Workitem.new(fexp(fei).h.applied_workitem)
+ Ruote::Workitem.new(flow_expression(fexp_or_fei).h.applied_workitem)
end
- alias workitem applied_workitem
+ alias workitem fetch_workitem
+ alias applied_workitem fetch_workitem
+
protected
# Stashes values in the participant expression (in the storage).
#
# put(workitem.fei, 'key' => 'value', 'colour' => 'blue')
@@ -159,19 +201,29 @@
#
# See the thread at
# http://groups.google.com/group/openwferu-users/t/2e6a95708c10847b for the
# justification.
#
- def put(fei, hash)
+ def stash_put(workitem_or_fei, key, value=nil)
- fexp = Ruote::Exp::FlowExpression.fetch(@context, fei.to_h)
+ hash = key.is_a?(Hash) ? key : { key => value }
- (fexp.h['stash'] ||= {}).merge!(hash)
+ exp = fetch_flow_expression(workitem_or_fei)
- fexp.persist_or_raise
+ (exp.h['stash'] ||= {}).merge!(hash)
+
+ r = exp.try_persist
+
+ return hash if r == nil
+ return stash_put(workitem_or_fei, key, value) if r != true
+
+ fei = Ruote::FlowExpressionId.extract(workitem_or_fei).sid rescue 'xxx'
+ raise ArgumentError.new("failed to put, expression #{fei} is gone")
end
+ alias put stash_put
+
# Fetches back a stashed value.
#
# get(fei, 'colour')
# # => 'blue'
#
@@ -181,17 +233,18 @@
# # => { 'colour' => 'blue' }
#
# put & get are useful for a participant that needs to communicate
# between its consume and its cancel.
#
- def get(fei, key=nil)
+ def stash_get(workitem_or_fei, key=nil)
- fexp = Ruote::Exp::FlowExpression.fetch(@context, fei.to_h)
+ stash = fetch_flow_expression(workitem_or_fei).h['stash'] rescue nil
+ stash ||= {}
- stash = fexp.h['stash'] rescue {}
-
key ? stash[key] : stash
end
+
+ alias get stash_get
end
#
# A receiver is meant to receive workitems and feed them back into the
# engine (the storage actually).