lib/dispatch-rider/demultiplexer.rb in dispatch-rider-0.0.7 vs lib/dispatch-rider/demultiplexer.rb in dispatch-rider-0.1.0
- old
+ new
@@ -3,34 +3,55 @@
# Demultiplexer#start defines an event loop which pops items from the queue
# and passes it on to the dispatcher for dispatching to the appropriate message handler.
# The demultiplexer can be stopped by calling the Demultiplexer#stop method.
module DispatchRider
class Demultiplexer
- attr_reader :queue, :dispatcher
+ attr_reader :queue, :dispatcher, :error_handler
- def initialize(queue, dispatcher)
+ def initialize(queue, dispatcher, error_handler)
@queue = queue
@dispatcher = dispatcher
+ @error_handler = error_handler
@continue = true
end
def start
- catch(:done) do
- loop do
- throw :done unless @continue
- queue.pop do |message|
- dispatch_message(message)
- end
+ do_loop do
+ begin
+ handle_next_queue_item
+ rescue Exception => exception
+ error_handler.call(Message.new(subject: "TopLevelError", body: {}), exception)
+ throw :done
end
end
self
end
def stop
@continue = false
end
+ private
+
def dispatch_message(message)
dispatcher.dispatch(message)
+ rescue Exception => exception
+ error_handler.call(message, exception)
end
+
+ def do_loop
+ catch(:done) do
+ loop do
+ throw :done unless @continue
+ yield
+ end
+ end
+ end
+
+ def handle_next_queue_item
+ queue.pop do |message|
+ dispatch_message(message)
+ end
+ end
+
end
end