lib/racecar/runner.rb in racecar-2.0.0.beta3 vs lib/racecar/runner.rb in racecar-2.0.0.beta4
- old
+ new
@@ -164,15 +164,20 @@
value: message.payload,
headers: message.headers
}
@instrumenter.instrument("start_process_message", instrumentation_payload)
- @instrumenter.instrument("process_message", instrumentation_payload) do
- with_pause(message.topic, message.partition, message.offset..message.offset) do
- processor.process(Racecar::Message.new(message))
- processor.deliver!
- consumer.store_offset(message)
+ with_pause(message.topic, message.partition, message.offset..message.offset) do
+ begin
+ @instrumenter.instrument("process_message", instrumentation_payload) do
+ processor.process(Racecar::Message.new(message))
+ processor.deliver!
+ consumer.store_offset(message)
+ end
+ rescue => e
+ config.error_handler.call(e, instrumentation_payload)
+ raise e
end
end
end
def process_batch(messages)
@@ -181,18 +186,24 @@
consumer_class: processor.class.to_s,
topic: first.topic,
partition: first.partition,
first_offset: first.offset,
last_offset: last.offset,
+ last_create_time: last.timestamp,
message_count: messages.size
}
@instrumenter.instrument("start_process_batch", instrumentation_payload)
@instrumenter.instrument("process_batch", instrumentation_payload) do
with_pause(first.topic, first.partition, first.offset..last.offset) do
- processor.process_batch(messages.map {|message| Racecar::Message.new(message) })
- processor.deliver!
- consumer.store_offset(messages.last)
+ begin
+ processor.process_batch(messages.map {|message| Racecar::Message.new(message) })
+ processor.deliver!
+ consumer.store_offset(messages.last)
+ rescue => e
+ config.error_handler.call(e, instrumentation_payload)
+ raise e
+ end
end
end
end
def with_pause(topic, partition, offsets)