lib/ruote/receiver/base.rb in ruote-2.2.0 vs lib/ruote/receiver/base.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -27,11 +27,11 @@ # # The core methods for the Receiver class (sometimes a Mixin is easier # to integrate). # - # (The engine itself includes this mixin, the LocalParticipant module + # (The dashboard itself includes this mixin, the LocalParticipant module # includes it as well). # module ReceiverMixin # This method pipes back a workitem into the engine, letting it resume @@ -47,58 +47,103 @@ 'workitem' => workitem, 'participant_name' => workitem['participant_name'], 'receiver' => sign) end + # Wraps a call to receive(workitem) + # + # Not aliasing so that if someone changes the receive implementation, + # reply is affected as well. + # + def reply(workitem) + + receive(workitem) + end + + # Can be used to raise an error in the workflow instance. + # + # Can be called either with an error class and arguments, either with + # an error instance (and no arguments). + # + # The workitem can be either an instance of Ruote::Workitem or a workitem + # in its Hash representation. + # + # receiver.flunk(workitem, ArgumentError, "not enough info") + # + # rescue => e + # receiver.flunk(workitem, e) + # end + # + def flunk(workitem, error_class_or_instance_or_message, *err_arguments) + + err = error_class_or_instance_or_message + + if err.is_a?(String) + err = RuntimeError.new(err) + err.set_backtrace(caller) + + elsif err.is_a?(Class) + err = err.new(*err_arguments) + err.set_backtrace(caller) + end + + workitem = workitem.h if workitem.respond_to?(:h) + + @context.storage.put_msg( + 'raise', + 'fei' => workitem['fei'], + 'wfid' => workitem['wfid'], + 'msg' => { + 'action' => 'dispatch', + 'fei' => workitem['fei'], + 'participant_name' => workitem['participant_name'], + 'participant' => nil, + 'workitem' => workitem + }, + 'error' => { + 'class' => err.class.name, + 'message' => err.message, + 'trace' => err.backtrace + }) + end + # Given a process definitions and optional initial fields and variables, # launches a new process instance. # - # This method is mostly used from the Ruote::Engine class (which includes - # this mixin). + # This method is mostly used from the Ruote::Dashboard class (which + # includes this mixin). # # process_definition must be a result of Ruote.process_definition call # or XML or JSON serialized process definition, as accepted by # Ruote::Reader#read. # # fields are workflow parameters that will be placed in workitem.fields. # - # variables contain engine variables. + # Calls to this method returns the newly launched "workflow instance id" + # ("wfid" for short), the [hopefully] unique identifier for the + # process instance. # - def launch(process_definition, fields={}, variables={}) + # == custom :wfid + # + # When calling this method, it's OK to pass a field named :wfid (Symbol, + # not String) that will be used as the identifier for the process instance. + # + def launch(process_definition, fields={}, variables={}, root_stash=nil) - wfid = @context.wfidgen.generate + wfid = fields[:wfid] || @context.wfidgen.generate @context.storage.put_msg( 'launch', 'wfid' => wfid, 'tree' => @context.reader.read(process_definition), 'workitem' => { 'fields' => fields }, - 'variables' => variables) + 'variables' => variables, + 'stash' => root_stash) wfid end - # Wraps a call to receive(workitem) - # - # Not aliasing so that if someone changes the receive implementation, - # reply is affected as well. - # - def reply(workitem) - - receive(workitem) - end - - # Wraps a call to receive(workitem) - # - # Not aliasing so that if someone changes the receive implementation, - # reply_to_engine is affected as well. - # - def reply_to_engine(workitem) - - receive(workitem) - end - # A receiver signs a workitem when it comes back. # # Not used much as of now. # def sign @@ -114,17 +159,12 @@ Ruote::Exp::FlowExpression.fetch( @context, Ruote::FlowExpressionId.extract_h(workitem_or_fei)) end - # For example : - # - # fexp = engine.fexp(fei) - # # or - # fexp = engine.fexp(workitem) - # alias fexp fetch_flow_expression + alias flow_expression fetch_flow_expression # A convenience methods for advanced users (like Oleg). # # Given a fei (flow expression id), fetches the workitem as stored in # the expression with that fei. @@ -138,16 +178,18 @@ # FlowExpressionId instance or a hash. # # on_terminate processes are not triggered for on_error processes. # on_error processes are triggered for on_terminate processes as well. # - def applied_workitem(fei) + def fetch_workitem(fexp_or_fei) - Ruote::Workitem.new(fexp(fei).h.applied_workitem) + Ruote::Workitem.new(flow_expression(fexp_or_fei).h.applied_workitem) end - alias workitem applied_workitem + alias workitem fetch_workitem + alias applied_workitem fetch_workitem + protected # Stashes values in the participant expression (in the storage). # # put(workitem.fei, 'key' => 'value', 'colour' => 'blue') @@ -159,19 +201,29 @@ # # See the thread at # http://groups.google.com/group/openwferu-users/t/2e6a95708c10847b for the # justification. # - def put(fei, hash) + def stash_put(workitem_or_fei, key, value=nil) - fexp = Ruote::Exp::FlowExpression.fetch(@context, fei.to_h) + hash = key.is_a?(Hash) ? key : { key => value } - (fexp.h['stash'] ||= {}).merge!(hash) + exp = fetch_flow_expression(workitem_or_fei) - fexp.persist_or_raise + (exp.h['stash'] ||= {}).merge!(hash) + + r = exp.try_persist + + return hash if r == nil + return stash_put(workitem_or_fei, key, value) if r != true + + fei = Ruote::FlowExpressionId.extract(workitem_or_fei).sid rescue 'xxx' + raise ArgumentError.new("failed to put, expression #{fei} is gone") end + alias put stash_put + # Fetches back a stashed value. # # get(fei, 'colour') # # => 'blue' # @@ -181,17 +233,18 @@ # # => { 'colour' => 'blue' } # # put & get are useful for a participant that needs to communicate # between its consume and its cancel. # - def get(fei, key=nil) + def stash_get(workitem_or_fei, key=nil) - fexp = Ruote::Exp::FlowExpression.fetch(@context, fei.to_h) + stash = fetch_flow_expression(workitem_or_fei).h['stash'] rescue nil + stash ||= {} - stash = fexp.h['stash'] rescue {} - key ? stash[key] : stash end + + alias get stash_get end # # A receiver is meant to receive workitems and feed them back into the # engine (the storage actually).