lib/racecar/runner.rb in racecar-2.9.0 vs lib/racecar/runner.rb in racecar-2.10.0.beta1

- old
+ new

@@ -94,11 +94,11 @@ consumer.close end end ensure producer.close - Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog") + Racecar::Datadog.close if config.datadog_enabled @instrumenter.instrument("shut_down", instrumentation_payload || {}) end def stop @stop_requested = true @@ -129,15 +129,10 @@ end end def consumer @consumer ||= begin - # Manually store offset after messages have been processed successfully - # to avoid marking failed messages as committed. The call just updates - # a value within librdkafka and is asynchronously written to proper - # storage through auto commits. - config.consumer << "enable.auto.offset.store=false" ConsumerSet.new(config, logger, @instrumenter) end end def producer @@ -211,24 +206,24 @@ last_create_time: last.timestamp, message_count: messages.size } @instrumenter.instrument("start_process_batch", instrumentation_payload) - @instrumenter.instrument("process_batch", instrumentation_payload) do - with_pause(first.topic, first.partition, first.offset..last.offset) do |pause| - begin + with_pause(first.topic, first.partition, first.offset..last.offset) do |pause| + begin + @instrumenter.instrument("process_batch", instrumentation_payload) do racecar_messages = messages.map do |message| Racecar::Message.new(message, retries_count: pause.pauses_count) end processor.process_batch(racecar_messages) processor.deliver! consumer.store_offset(messages.last) - rescue => e - instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) - instrumentation_payload[:retries_count] = pause.pauses_count - config.error_handler.call(e, instrumentation_payload) - raise e end + rescue => e + instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) + instrumentation_payload[:retries_count] = pause.pauses_count + config.error_handler.call(e, instrumentation_payload) + raise e end end end # librdkafka will continue to try to deliver already queued messages, even if ruby-rdkafka