lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.16.3 vs lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.16.4
- old
+ new
@@ -4,14 +4,16 @@
# Assumes that you have a topic with only one partition.
module Deimos
module Utils
# Listener that can seek to get the last X messages in a topic.
class SeekListener < Phobos::Listener
+ # @return [Integer]
MAX_SEEK_RETRIES = 3
+ # @return [Integer]
attr_accessor :num_messages
- # :nodoc:
+ # @return [void]
def start_listener
@num_messages ||= 10
@consumer = create_kafka_consumer
@consumer.subscribe(topic, @subscribe_opts)
attempt = 0
@@ -43,43 +45,48 @@
class MessageBankHandler < Deimos::Consumer
include Phobos::Handler
cattr_accessor :total_messages
- # @param klass [Class < Deimos::Consumer]
+ # @param klass [Class<Deimos::Consumer>]
+ # @return [void]
def self.config_class=(klass)
self.config.merge!(klass.config)
end
- # :nodoc:
+ # @param _kafka_client [Kafka::Client]
+ # @return [void]
def self.start(_kafka_client)
self.total_messages = []
end
- # :nodoc:
+ # @param payload [Hash]
+ # @param metadata [Hash]
def consume(payload, metadata)
self.class.total_messages << {
key: metadata[:key],
payload: payload
}
end
end
# Class which can process/consume messages inline.
class InlineConsumer
+ # @return [Integer]
MAX_MESSAGE_WAIT_TIME = 1.second
+ # @return [Integer]
MAX_TOPIC_WAIT_TIME = 10.seconds
# Get the last X messages from a topic. You can specify a subclass of
# Deimos::Consumer or Deimos::Producer, or provide the
# schema, namespace and key_config directly.
# @param topic [String]
- # @param config_class [Class < Deimos::Consumer|Deimos::Producer>]
+ # @param config_class [Class<Deimos::Consumer>,Class<Deimos::Producer>]
# @param schema [String]
# @param namespace [String]
# @param key_config [Hash]
- # @param num_messages [Number]
+ # @param num_messages [Integer]
# @return [Array<Hash>]
def self.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil,
config_class: nil, num_messages: 10)
if config_class
MessageBankHandler.config_class = config_class
@@ -104,9 +111,10 @@
# Consume the last X messages from a topic.
# @param topic [String]
# @param frk_consumer [Class]
# @param num_messages [Integer] If this number is >= the number
# of messages in the topic, all messages will be consumed.
+ # @return [void]
def self.consume(topic:, frk_consumer:, num_messages: 10)
listener = SeekListener.new(
handler: frk_consumer,
group_id: SecureRandom.hex,
topic: topic,