lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.8.3 vs lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.8.4

- old
+ new

@@ -4,27 +4,34 @@ # Assumes that you have a topic with only one partition. module Deimos module Utils # Listener that can seek to get the last X messages in a topic. class SeekListener < Phobos::Listener + MAX_SEEK_RETRIES = 3 attr_accessor :num_messages # :nodoc: def start_listener @num_messages ||= 10 @consumer = create_kafka_consumer @consumer.subscribe(topic, @subscribe_opts) + attempt = 0 begin + attempt += 1 last_offset = @kafka_client.last_offset_for(topic, 0) offset = last_offset - num_messages if offset.positive? Deimos.config.logger.info("Seeking to #{offset}") @consumer.seek(topic, 0, offset) end rescue StandardError => e - "Could not seek to offset: #{e.message}" + if attempt < MAX_SEEK_RETRIES + sleep(1.seconds * attempt) + retry + end + log_error("Could not seek to offset: #{e.message} after #{MAX_SEEK_RETRIES} retries", listener_metadata) end instrument('listener.start_handler', listener_metadata) do @handler_class.start(@kafka_client) end @@ -48,10 +55,9 @@ self.total_messages = [] end # :nodoc: def consume(payload, metadata) - puts "Got #{payload}" self.class.total_messages << { key: metadata[:key], payload: payload } end