lib/dispatch-rider/demultiplexer.rb in dispatch-rider-0.0.7 vs lib/dispatch-rider/demultiplexer.rb in dispatch-rider-0.1.0

- old
+ new

@@ -3,34 +3,55 @@ # Demultiplexer#start defines an event loop which pops items from the queue # and passes it on to the dispatcher for dispatching to the appropriate message handler. # The demultiplexer can be stopped by calling the Demultiplexer#stop method. module DispatchRider class Demultiplexer - attr_reader :queue, :dispatcher + attr_reader :queue, :dispatcher, :error_handler - def initialize(queue, dispatcher) + def initialize(queue, dispatcher, error_handler) @queue = queue @dispatcher = dispatcher + @error_handler = error_handler @continue = true end def start - catch(:done) do - loop do - throw :done unless @continue - queue.pop do |message| - dispatch_message(message) - end + do_loop do + begin + handle_next_queue_item + rescue Exception => exception + error_handler.call(Message.new(subject: "TopLevelError", body: {}), exception) + throw :done end end self end def stop @continue = false end + private + def dispatch_message(message) dispatcher.dispatch(message) + rescue Exception => exception + error_handler.call(message, exception) end + + def do_loop + catch(:done) do + loop do + throw :done unless @continue + yield + end + end + end + + def handle_next_queue_item + queue.pop do |message| + dispatch_message(message) + end + end + end end