lib/ruote/svc/dispatch_pool.rb in ruote-2.1.11 vs lib/ruote/svc/dispatch_pool.rb in ruote-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -32,85 +32,112 @@
# Can be extended/replaced for better handling of Thread (why not something
# like a thread pool or no threads at all).
#
class DispatchPool
- def initialize (context)
+ def initialize(context)
@context = context
end
- def handle (msg)
+ def handle(msg)
case msg['action']
when 'dispatch'
dispatch(msg)
when 'dispatch_cancel'
dispatch_cancel(msg)
else
- # simply discard message
+ # simply discard the message
end
end
protected
- def dispatch_cancel (msg)
+ def dispatch_cancel(msg)
flavour = msg['flavour']
participant = @context.plist.instantiate(msg['participant'])
begin
participant.cancel(Ruote::FlowExpressionId.new(msg['fei']), flavour)
- rescue Exception => e
+ rescue => e
raise(e) if flavour != 'kill'
end
@context.storage.put_msg(
'reply',
'fei' => msg['fei'],
'workitem' => msg['workitem'])
end
- def dispatch (msg)
+ def dispatch(msg)
participant = @context.plist.lookup(
msg['participant'] || msg['participant_name'], msg['workitem'])
- if participant.respond_to?(:do_not_thread) && participant.do_not_thread
+ if do_not_thread(participant, msg)
do_dispatch(participant, msg)
else
do_threaded_dispatch(participant, msg)
end
end
- def do_dispatch (participant, msg)
+ # The actual dispatching (call to Participant#consume).
+ #
+ def do_dispatch(participant, msg)
workitem = Ruote::Workitem.new(msg['workitem'])
workitem.fields['dispatched_at'] = Ruote.now_to_utc_s
participant.consume(workitem)
- @context.storage.put_msg('dispatched', 'fei' => msg['fei'])
- # once the consume is done, asynchronously flag the
- # participant expression as 'dispatched'
+ @context.storage.put_msg(
+ 'dispatched',
+ 'fei' => msg['fei'],
+ 'participant_name' => workitem.participant_name)
+ # once the consume is done, asynchronously flag the
+ # participant expression as 'dispatched'
end
- def do_threaded_dispatch (participant, msg)
+ # Wraps the call to do_dispatch in a thread.
+ #
+ def do_threaded_dispatch(participant, msg)
+ msg = Rufus::Json.dup(msg)
+ #
+ # the thread gets its own copy of the message
+ # (especially important if the main thread does something with
+ # the message 'during' the dispatch)
+
# Maybe at some point a limit on the number of dispatch threads
# would be OK.
# Or maybe it's the job of an extension / subclass
Thread.new do
begin
do_dispatch(participant, msg)
- rescue Exception => exception
+ rescue => exception
@context.error_handler.msg_handle(msg, exception)
end
+ end
+ end
+
+ # Returns true if the participant doesn't want the #consume to happen
+ # in a new Thread.
+ #
+ def do_not_thread(participant, msg)
+
+ return false unless participant.respond_to?(:do_not_thread)
+
+ if participant.method(:do_not_thread).arity == 0
+ participant.do_not_thread
+ else
+ participant.do_not_thread(Ruote::Workitem.new(msg['workitem']))
end
end
end
end