lib/ruote/svc/dispatch_pool.rb in ruote-2.3.0.1 vs lib/ruote/svc/dispatch_pool.rb in ruote-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -39,35 +39,40 @@
@context = context
end
def handle(msg)
- case msg['action']
- when 'dispatch' then dispatch(msg)
- when 'dispatch_cancel' then dispatch_cancel(msg)
- when 'dispatch_pause', 'dispatch_resume' then dispatch_pause(msg)
- else # simply discard the message
- end
+ return unless msg['action'].match(/^dispatch/)
+
+ send(msg['action'], msg)
end
protected
+ # Dispatching the msg.
+ #
def dispatch(msg)
participant = @context.plist.lookup(
msg['participant'] || msg['participant_name'], msg['workitem'])
- if (@context['participant_threads_enabled'] == false) || do_not_thread?(participant, msg)
+ if
+ @context['participant_threads_enabled'] == false ||
+ do_not_thread?(participant, msg)
+ then
do_dispatch(participant, msg)
else
do_threaded_dispatch(participant, msg)
end
end
# The actual dispatching (call to Participant#consume or #on_workitem).
#
- def do_dispatch(participant, msg)
+ # No error rescuing so it might be interesting for some extension
+ # classes (like in ruote-swf).
+ #
+ def do_raw_dispatch(participant, msg)
workitem = Ruote::Workitem.new(msg['workitem'])
workitem.fields['dispatched_at'] = Ruote.now_to_utc_s
@@ -81,10 +86,20 @@
'workitem' => msg['workitem'])
# once the consume is done, asynchronously flag the
# participant expression as 'dispatched'
end
+ # The raw dispatch work, wrapped in error handling.
+ #
+ def do_dispatch(participant, msg)
+
+ do_raw_dispatch(participant, msg)
+
+ rescue => err
+ @context.error_handler.msg_handle(msg, err)
+ end
+
# Wraps the call to do_dispatch in a thread.
#
def do_threaded_dispatch(participant, msg)
msg = Rufus::Json.dup(msg)
@@ -95,19 +110,11 @@
# Maybe at some point a limit on the number of dispatch threads
# would be OK.
# Or maybe it's the job of an extension / subclass
- Thread.new do
- begin
-
- do_dispatch(participant, msg)
-
- rescue => exception
- @context.error_handler.msg_handle(msg, exception)
- end
- end
+ Thread.new { do_dispatch(participant, msg) }
end
# Returns true if the participant doesn't want the #consume to happen
# in a new Thread.
#
@@ -163,9 +170,13 @@
Ruote.participant_send(
participant,
action,
'fei' => Ruote::FlowExpressionId.new(msg['fei']), :default => false)
end
+
+ # Route to dispatch_pause which handles both pause and resume.
+ #
+ alias dispatch_resume dispatch_pause
end
# Given a participant, a method name or an array of method names and
# a hash of arguments, will do its best to set the instance variables
# corresponding to the arguments (if possible) and to call the