lib/racecar/consumer_set.rb in racecar-2.1.0 vs lib/racecar/consumer_set.rb in racecar-2.1.1
- old
+ new
@@ -10,55 +10,45 @@
raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?
@consumers = []
@consumer_id_iterator = (0...@config.subscriptions.size).cycle
+ @previous_retries = 0
+
@last_poll_read_nil_message = false
end
- def poll(timeout_ms)
+ def poll(max_wait_time_ms = @config.max_wait_time)
+ batch_poll(max_wait_time_ms, 1).first
+ end
+
+ # batch_poll collects messages until any of the following occurs:
+ # - max_wait_time_ms time has passed
+ # - max_messages have been collected
+ # - a nil message was polled (end of topic, Kafka stalled, etc.)
+ #
+ # The messages are from a single topic, but potentially from more than one partition.
+ #
+ # Any errors during polling are retried in an exponential backoff fashion. If an error
+ # occurs, but there is no time left for a backoff and retry, it will return the
+ # already collected messages and only retry on the next call.
+ def batch_poll(max_wait_time_ms = @config.max_wait_time, max_messages = @config.fetch_messages)
+ started_at = Time.now
+ remain_ms = max_wait_time_ms
maybe_select_next_consumer
- started_at ||= Time.now
- try ||= 0
- remain ||= timeout_ms
+ messages = []
- msg = remain <= 0 ? nil : current.poll(remain)
- rescue Rdkafka::RdkafkaError => e
- wait_before_retry_ms = 100 * (2**try) # 100ms, 200ms, 400ms, …
- try += 1
- raise if try >= MAX_POLL_TRIES || remain <= wait_before_retry_ms
-
- @logger.error "(try #{try}): Error for topic subscription #{current_subscription}: #{e}"
-
- case e.code
- when :max_poll_exceeded, :transport # -147, -195
- reset_current_consumer
+ while remain_ms > 0 && messages.size < max_messages
+ remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
+ msg = poll_with_retries(remain_ms)
+ break if msg.nil?
+ messages << msg
end
- remain = remaining_time_ms(timeout_ms, started_at)
- raise if remain <= wait_before_retry_ms
-
- sleep wait_before_retry_ms/1000.0
- retry
- ensure
- @last_poll_read_nil_message = true if msg.nil?
+ messages
end
- # XXX: messages are not guaranteed to be from the same partition
- def batch_poll(timeout_ms)
- @batch_started_at = Time.now
- @messages = []
- while collect_messages_for_batch? do
- remain = remaining_time_ms(timeout_ms, @batch_started_at)
- break if remain <= 0
- msg = poll(remain)
- break if msg.nil?
- @messages << msg
- end
- @messages
- end
-
def store_offset(message)
current.store_offset(message)
end
def commit
@@ -123,10 +113,56 @@
end
end
private
+ # polls a single message from the current consumer, retrying errors with exponential
+ # backoff. The sleep time is capped by max_wait_time_ms. If there's enough time budget
+ # left, it will retry before returning. If there isn't, the retry will only occur on
+ # the next call. It tries up to MAX_POLL_TRIES before passing on the exception.
+ def poll_with_retries(max_wait_time_ms)
+ try ||= @previous_retries
+ @previous_retries = 0
+ started_at ||= Time.now
+ remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
+
+ wait_ms = try == 0 ? 0 : 50 * (2**try) # 0ms, 100ms, 200ms, 400ms, …
+ if wait_ms >= max_wait_time_ms && remain_ms > 1
+ @logger.debug "Capping #{wait_ms}ms to #{max_wait_time_ms-1}ms."
+ sleep (max_wait_time_ms-1)/1000.0
+ remain_ms = 1
+ elsif wait_ms >= remain_ms
+ @logger.error "Only #{remain_ms}ms left, but want to wait for #{wait_ms}ms before poll. Will retry on next call."
+ @previous_retries = try
+ return nil
+ elsif wait_ms > 0
+ sleep wait_ms/1000.0
+ remain_ms -= wait_ms
+ end
+
+ poll_current_consumer(remain_ms)
+ rescue Rdkafka::RdkafkaError => e
+ try += 1
+ @instrumenter.instrument("poll_retry", try: try, rdkafka_time_limit: remain_ms, exception: e)
+ @logger.error "(try #{try}/#{MAX_POLL_TRIES}): Error for topic subscription #{current_subscription}: #{e}"
+ raise if try >= MAX_POLL_TRIES
+ retry
+ end
+
+ # polls a message for the current consumer, handling any API edge cases.
+ def poll_current_consumer(max_wait_time_ms)
+ msg = current.poll(max_wait_time_ms)
+ rescue Rdkafka::RdkafkaError => e
+ case e.code
+ when :max_poll_exceeded, :transport # -147, -195
+ reset_current_consumer
+ end
+ raise
+ ensure
+ @last_poll_read_nil_message = msg.nil?
+ end
+
def find_consumer_by(topic, partition)
each do |consumer|
tpl = consumer.assignment.to_h
rdkafka_partition = tpl[topic]&.detect { |part| part.partition == partition }
next unless rdkafka_partition
@@ -140,11 +176,16 @@
def current_subscription
@config.subscriptions[@consumer_id_iterator.peek]
end
def reset_current_consumer
- @consumers[@consumer_id_iterator.peek] = nil
+ current_consumer_id = @consumer_id_iterator.peek
+ @logger.info "Resetting consumer with id: #{current_consumer_id}"
+
+ consumer = @consumers[current_consumer_id]
+ consumer.close unless consumer.nil?
+ @consumers[current_consumer_id] = nil
end
def maybe_select_next_consumer
return unless @last_poll_read_nil_message
@last_poll_read_nil_message = false
@@ -158,14 +199,9 @@
def commit_rescue_no_offset(consumer)
consumer.commit(nil, !@config.synchronous_commits)
rescue Rdkafka::RdkafkaError => e
raise e if e.code != :no_offset
@logger.debug "Nothing to commit."
- end
-
- def collect_messages_for_batch?
- @messages.size < @config.fetch_messages &&
- (Time.now - @batch_started_at) < @config.max_wait_time
end
def rdkafka_config(subscription)
# https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config = {