lib/racecar/runner.rb in racecar-2.9.0 vs lib/racecar/runner.rb in racecar-2.10.0.beta1
- old
+ new
@@ -94,11 +94,11 @@
consumer.close
end
end
ensure
producer.close
- Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog")
+ Racecar::Datadog.close if config.datadog_enabled
@instrumenter.instrument("shut_down", instrumentation_payload || {})
end
def stop
@stop_requested = true
@@ -129,15 +129,10 @@
end
end
def consumer
@consumer ||= begin
- # Manually store offset after messages have been processed successfully
- # to avoid marking failed messages as committed. The call just updates
- # a value within librdkafka and is asynchronously written to proper
- # storage through auto commits.
- config.consumer << "enable.auto.offset.store=false"
ConsumerSet.new(config, logger, @instrumenter)
end
end
def producer
@@ -211,24 +206,24 @@
last_create_time: last.timestamp,
message_count: messages.size
}
@instrumenter.instrument("start_process_batch", instrumentation_payload)
- @instrumenter.instrument("process_batch", instrumentation_payload) do
- with_pause(first.topic, first.partition, first.offset..last.offset) do |pause|
- begin
+ with_pause(first.topic, first.partition, first.offset..last.offset) do |pause|
+ begin
+ @instrumenter.instrument("process_batch", instrumentation_payload) do
racecar_messages = messages.map do |message|
Racecar::Message.new(message, retries_count: pause.pauses_count)
end
processor.process_batch(racecar_messages)
processor.deliver!
consumer.store_offset(messages.last)
- rescue => e
- instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e)
- instrumentation_payload[:retries_count] = pause.pauses_count
- config.error_handler.call(e, instrumentation_payload)
- raise e
end
+ rescue => e
+ instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e)
+ instrumentation_payload[:retries_count] = pause.pauses_count
+ config.error_handler.call(e, instrumentation_payload)
+ raise e
end
end
end
# librdkafka will continue to try to deliver already queued messages, even if ruby-rdkafka