lib/racecar/runner.rb in racecar-2.0.0.beta3 vs lib/racecar/runner.rb in racecar-2.0.0.beta4

- old
+ new

@@ -164,15 +164,20 @@ value: message.payload, headers: message.headers } @instrumenter.instrument("start_process_message", instrumentation_payload) - @instrumenter.instrument("process_message", instrumentation_payload) do - with_pause(message.topic, message.partition, message.offset..message.offset) do - processor.process(Racecar::Message.new(message)) - processor.deliver! - consumer.store_offset(message) + with_pause(message.topic, message.partition, message.offset..message.offset) do + begin + @instrumenter.instrument("process_message", instrumentation_payload) do + processor.process(Racecar::Message.new(message)) + processor.deliver! + consumer.store_offset(message) + end + rescue => e + config.error_handler.call(e, instrumentation_payload) + raise e end end end def process_batch(messages) @@ -181,18 +186,24 @@ consumer_class: processor.class.to_s, topic: first.topic, partition: first.partition, first_offset: first.offset, last_offset: last.offset, + last_create_time: last.timestamp, message_count: messages.size } @instrumenter.instrument("start_process_batch", instrumentation_payload) @instrumenter.instrument("process_batch", instrumentation_payload) do with_pause(first.topic, first.partition, first.offset..last.offset) do - processor.process_batch(messages.map {|message| Racecar::Message.new(message) }) - processor.deliver! - consumer.store_offset(messages.last) + begin + processor.process_batch(messages.map {|message| Racecar::Message.new(message) }) + processor.deliver! + consumer.store_offset(messages.last) + rescue => e + config.error_handler.call(e, instrumentation_payload) + raise e + end end end end def with_pause(topic, partition, offsets)