lib/racecar/consumer.rb in racecar-2.2.0 vs lib/racecar/consumer.rb in racecar-2.3.0.alpha1
- old
+ new
@@ -1,15 +1,17 @@
# frozen_string_literal: true
+require "racecar/message_delivery_error"
+
module Racecar
class Consumer
Subscription = Struct.new(:topic, :start_from_beginning, :max_bytes_per_partition, :additional_config)
class << self
attr_accessor :max_wait_time
attr_accessor :group_id
- attr_accessor :producer, :consumer
+ attr_accessor :producer, :consumer, :parallel_workers
def subscriptions
@subscriptions ||= []
end
@@ -23,32 +25,65 @@
# @param max_bytes_per_partition [Integer] the maximum number of bytes to fetch from
# each partition at a time.
# @param additional_config [Hash] Configuration properties for consumer.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# @return [nil]
- def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {})
+ def subscribes_to(
+ *topics,
+ start_from_beginning: true,
+ max_bytes_per_partition: 1048576,
+ additional_config: {}
+ )
topics.each do |topic|
subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config)
end
end
end
- def configure(producer:, consumer:, instrumenter: NullInstrumenter)
+ def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config)
@producer = producer
+ @delivery_handles = []
+
@consumer = consumer
+
@instrumenter = instrumenter
+ @config = config
end
def teardown; end
- # Delivers messages that got produced.
+ # Blocks until all messages produced so far have been successfully published. If
+ # message delivery finally fails, a Racecar::MessageDeliveryError is raised. The
+ # delivery failed for the reason in the exception. The error can be broker side
+ # (e.g. downtime, configuration issue) or specific to the message being sent. The
+ # caller must handle the latter cases or run into head of line blocking.
def deliver!
@delivery_handles ||= []
if @delivery_handles.any?
instrumentation_payload = { delivered_message_count: @delivery_handles.size }
@instrumenter.instrument('deliver_messages', instrumentation_payload) do
- @delivery_handles.each(&:wait)
+ @delivery_handles.each do |handle|
+ # rdkafka-ruby checks every wait_timeout seconds if the message was
+ # successfully delivered, up to max_wait_timeout seconds before raising
+ # Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to
+ # deliver all messages in the background, until "config.message_timeout"
+ # (message.timeout.ms) is exceeded. Phrased differently, rdkafka-ruby's
+ # WaitTimeoutError is just informative.
+ # The raising can be avoided if max_wait_timeout below is greater than
+ # config.message_timeout, but config is not available here (without
+ # changing the interface).
+ handle.wait(max_wait_timeout: 60, wait_timeout: 0.1)
+ rescue Rdkafka::AbstractHandle::WaitTimeoutError => e
+ partition = MessageDeliveryError.partition_from_delivery_handle(handle)
+ # ideally we could use the logger passed to the Runner, but it is not
+ # available here. The runner sets it for Rdkafka, though, so we can use
+ # that instead.
+ @config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)"
+ retry
+ rescue Rdkafka::RdkafkaError => e
+ raise MessageDeliveryError.new(e, handle)
+ end
end
end
@delivery_handles.clear
end