lib/racecar/consumer_set.rb in racecar-2.1.0 vs lib/racecar/consumer_set.rb in racecar-2.1.1

- old
+ new

@@ -10,55 +10,45 @@ raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty? @consumers = [] @consumer_id_iterator = (0...@config.subscriptions.size).cycle + @previous_retries = 0 + @last_poll_read_nil_message = false end - def poll(timeout_ms) + def poll(max_wait_time_ms = @config.max_wait_time) + batch_poll(max_wait_time_ms, 1).first + end + + # batch_poll collects messages until any of the following occurs: + # - max_wait_time_ms time has passed + # - max_messages have been collected + # - a nil message was polled (end of topic, Kafka stalled, etc.) + # + # The messages are from a single topic, but potentially from more than one partition. + # + # Any errors during polling are retried in an exponential backoff fashion. If an error + # occurs, but there is no time left for a backoff and retry, it will return the + # already collected messages and only retry on the next call. + def batch_poll(max_wait_time_ms = @config.max_wait_time, max_messages = @config.fetch_messages) + started_at = Time.now + remain_ms = max_wait_time_ms maybe_select_next_consumer - started_at ||= Time.now - try ||= 0 - remain ||= timeout_ms + messages = [] - msg = remain <= 0 ? nil : current.poll(remain) - rescue Rdkafka::RdkafkaError => e - wait_before_retry_ms = 100 * (2**try) # 100ms, 200ms, 400ms, … - try += 1 - raise if try >= MAX_POLL_TRIES || remain <= wait_before_retry_ms - - @logger.error "(try #{try}): Error for topic subscription #{current_subscription}: #{e}" - - case e.code - when :max_poll_exceeded, :transport # -147, -195 - reset_current_consumer + while remain_ms > 0 && messages.size < max_messages + remain_ms = remaining_time_ms(max_wait_time_ms, started_at) + msg = poll_with_retries(remain_ms) + break if msg.nil? + messages << msg end - remain = remaining_time_ms(timeout_ms, started_at) - raise if remain <= wait_before_retry_ms - - sleep wait_before_retry_ms/1000.0 - retry - ensure - @last_poll_read_nil_message = true if msg.nil? + messages end - # XXX: messages are not guaranteed to be from the same partition - def batch_poll(timeout_ms) - @batch_started_at = Time.now - @messages = [] - while collect_messages_for_batch? do - remain = remaining_time_ms(timeout_ms, @batch_started_at) - break if remain <= 0 - msg = poll(remain) - break if msg.nil? - @messages << msg - end - @messages - end - def store_offset(message) current.store_offset(message) end def commit @@ -123,10 +113,56 @@ end end private + # polls a single message from the current consumer, retrying errors with exponential + # backoff. The sleep time is capped by max_wait_time_ms. If there's enough time budget + # left, it will retry before returning. If there isn't, the retry will only occur on + # the next call. It tries up to MAX_POLL_TRIES before passing on the exception. + def poll_with_retries(max_wait_time_ms) + try ||= @previous_retries + @previous_retries = 0 + started_at ||= Time.now + remain_ms = remaining_time_ms(max_wait_time_ms, started_at) + + wait_ms = try == 0 ? 0 : 50 * (2**try) # 0ms, 100ms, 200ms, 400ms, … + if wait_ms >= max_wait_time_ms && remain_ms > 1 + @logger.debug "Capping #{wait_ms}ms to #{max_wait_time_ms-1}ms." + sleep (max_wait_time_ms-1)/1000.0 + remain_ms = 1 + elsif wait_ms >= remain_ms + @logger.error "Only #{remain_ms}ms left, but want to wait for #{wait_ms}ms before poll. Will retry on next call." + @previous_retries = try + return nil + elsif wait_ms > 0 + sleep wait_ms/1000.0 + remain_ms -= wait_ms + end + + poll_current_consumer(remain_ms) + rescue Rdkafka::RdkafkaError => e + try += 1 + @instrumenter.instrument("poll_retry", try: try, rdkafka_time_limit: remain_ms, exception: e) + @logger.error "(try #{try}/#{MAX_POLL_TRIES}): Error for topic subscription #{current_subscription}: #{e}" + raise if try >= MAX_POLL_TRIES + retry + end + + # polls a message for the current consumer, handling any API edge cases. + def poll_current_consumer(max_wait_time_ms) + msg = current.poll(max_wait_time_ms) + rescue Rdkafka::RdkafkaError => e + case e.code + when :max_poll_exceeded, :transport # -147, -195 + reset_current_consumer + end + raise + ensure + @last_poll_read_nil_message = msg.nil? + end + def find_consumer_by(topic, partition) each do |consumer| tpl = consumer.assignment.to_h rdkafka_partition = tpl[topic]&.detect { |part| part.partition == partition } next unless rdkafka_partition @@ -140,11 +176,16 @@ def current_subscription @config.subscriptions[@consumer_id_iterator.peek] end def reset_current_consumer - @consumers[@consumer_id_iterator.peek] = nil + current_consumer_id = @consumer_id_iterator.peek + @logger.info "Resetting consumer with id: #{current_consumer_id}" + + consumer = @consumers[current_consumer_id] + consumer.close unless consumer.nil? + @consumers[current_consumer_id] = nil end def maybe_select_next_consumer return unless @last_poll_read_nil_message @last_poll_read_nil_message = false @@ -158,14 +199,9 @@ def commit_rescue_no_offset(consumer) consumer.commit(nil, !@config.synchronous_commits) rescue Rdkafka::RdkafkaError => e raise e if e.code != :no_offset @logger.debug "Nothing to commit." - end - - def collect_messages_for_batch? - @messages.size < @config.fetch_messages && - (Time.now - @batch_started_at) < @config.max_wait_time end def rdkafka_config(subscription) # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md config = {