lib/ruote/svc/dispatch_pool.rb in ruote-2.2.0 vs lib/ruote/svc/dispatch_pool.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -40,65 +40,47 @@
end
def handle(msg)
case msg['action']
- when 'dispatch'
- dispatch(msg)
- when 'dispatch_cancel'
- dispatch_cancel(msg)
- else
- # simply discard the message
+ when 'dispatch' then dispatch(msg)
+ when 'dispatch_cancel' then dispatch_cancel(msg)
+ when 'dispatch_pause', 'dispatch_resume' then dispatch_pause(msg)
+ else # simply discard the message
end
end
protected
- def dispatch_cancel(msg)
-
- flavour = msg['flavour']
-
- participant = @context.plist.instantiate(msg['participant'])
-
- begin
- participant.cancel(Ruote::FlowExpressionId.new(msg['fei']), flavour)
- rescue => e
- raise(e) if flavour != 'kill'
- end
-
- @context.storage.put_msg(
- 'reply',
- 'fei' => msg['fei'],
- 'workitem' => msg['workitem'])
- end
-
def dispatch(msg)
participant = @context.plist.lookup(
msg['participant'] || msg['participant_name'], msg['workitem'])
- if do_not_thread(participant, msg)
+ if (@context['participant_threads_enabled'] == false) || do_not_thread?(participant, msg)
do_dispatch(participant, msg)
else
do_threaded_dispatch(participant, msg)
end
end
- # The actual dispatching (call to Participant#consume).
+ # The actual dispatching (call to Participant#consume or #on_workitem).
#
def do_dispatch(participant, msg)
workitem = Ruote::Workitem.new(msg['workitem'])
workitem.fields['dispatched_at'] = Ruote.now_to_utc_s
- participant.consume(workitem)
+ Ruote.participant_send(
+ participant, [ :on_workitem, :consume ], 'workitem' => workitem)
@context.storage.put_msg(
'dispatched',
'fei' => msg['fei'],
- 'participant_name' => workitem.participant_name)
+ 'participant_name' => workitem.participant_name,
+ 'workitem' => msg['workitem'])
# once the consume is done, asynchronously flag the
# participant expression as 'dispatched'
end
# Wraps the call to do_dispatch in a thread.
@@ -127,18 +109,105 @@
end
# Returns true if the participant doesn't want the #consume to happen
# in a new Thread.
#
- def do_not_thread(participant, msg)
+ def do_not_thread?(participant, msg)
- return false unless participant.respond_to?(:do_not_thread)
+ # :default => false makes participant_send return false if no method
+ # were found (else it would raise a NoMethodError)
- if participant.method(:do_not_thread).arity == 0
- participant.do_not_thread
- else
- participant.do_not_thread(Ruote::Workitem.new(msg['workitem']))
+ Ruote.participant_send(
+ participant,
+ [ :do_not_thread, :do_not_thread?, :dont_thread, :dont_thread? ],
+ 'workitem' => Ruote::Workitem.new(msg['workitem']), :default => false)
+ end
+
+ # Instantiates the participant and calls its cancel method.
+ #
+ def dispatch_cancel(msg)
+
+ flavour = msg['flavour']
+
+ participant = @context.plist.instantiate(msg['participant'])
+
+ result = begin
+
+ Ruote.participant_send(
+ participant,
+ [ :on_cancel, :cancel ],
+ 'fei' => Ruote::FlowExpressionId.new(msg['fei']),
+ 'flavour' => flavour)
+
+ rescue => e
+ raise(e) if flavour != 'kill'
end
+
+ @context.storage.put_msg(
+ 'reply',
+ 'fei' => msg['fei'],
+ 'workitem' => msg['workitem']
+ ) if result != false
end
+
+ # Instantiates the participant and calls its on_pause (or on_resume) method.
+ #
+ def dispatch_pause(msg)
+
+ action = (msg['action'] == 'dispatch_resume' ? :on_resume : :on_pause)
+
+ participant = @context.plist.instantiate(
+ msg['participant'], :if_respond_to? => action)
+
+ return unless participant
+
+ Ruote.participant_send(
+ participant,
+ action,
+ 'fei' => Ruote::FlowExpressionId.new(msg['fei']), :default => false)
+ end
+ end
+
+ # Given a participant, a method name or an array of method names and
+ # a hash of arguments, will do its best to set the instance variables
+ # corresponding to the arguments (if possible) and to call the
+ # method with the right number of arguments...
+ #
+ # Made it a Ruote module method so that RevParticipant might use it
+ # independently.
+ #
+ # If the arguments hash contains a value keyed :default, that value is
+ # returned when none of the methods is responded to by the participant.
+ # Else if :default is not set or is set to nil, a NoMethodError.
+ #
+ def self.participant_send(participant, methods, arguments)
+
+ default = arguments.delete(:default)
+
+ # set instance variables if possible
+
+ arguments.each do |key, value|
+ setter = "#{key}="
+ participant.send(setter, value) if participant.respond_to?(setter)
+ end
+
+ # call the method, with the right arity
+
+ Array(methods).each do |method|
+
+ next unless participant.respond_to?(method)
+
+ return participant.send(method) if participant.method(method).arity == 0
+
+ args = arguments.keys.sort.collect { |k| arguments[k] }
+ # luckily, our arg keys are in the alphabetical order (fei, flavour)
+
+ return participant.send(method, *args)
+ end
+
+ return default unless default == nil
+
+ raise NoMethodError.new(
+ "undefined method `#{methods.first}' for #{participant.class}")
end
end