lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.8.3 vs lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.8.4
- old
+ new
@@ -4,27 +4,34 @@
# Assumes that you have a topic with only one partition.
module Deimos
module Utils
# Listener that can seek to get the last X messages in a topic.
class SeekListener < Phobos::Listener
+ MAX_SEEK_RETRIES = 3
attr_accessor :num_messages
# :nodoc:
def start_listener
@num_messages ||= 10
@consumer = create_kafka_consumer
@consumer.subscribe(topic, @subscribe_opts)
+ attempt = 0
begin
+ attempt += 1
last_offset = @kafka_client.last_offset_for(topic, 0)
offset = last_offset - num_messages
if offset.positive?
Deimos.config.logger.info("Seeking to #{offset}")
@consumer.seek(topic, 0, offset)
end
rescue StandardError => e
- "Could not seek to offset: #{e.message}"
+ if attempt < MAX_SEEK_RETRIES
+ sleep(1.seconds * attempt)
+ retry
+ end
+ log_error("Could not seek to offset: #{e.message} after #{MAX_SEEK_RETRIES} retries", listener_metadata)
end
instrument('listener.start_handler', listener_metadata) do
@handler_class.start(@kafka_client)
end
@@ -48,10 +55,9 @@
self.total_messages = []
end
# :nodoc:
def consume(payload, metadata)
- puts "Got #{payload}"
self.class.total_messages << {
key: metadata[:key],
payload: payload
}
end