lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.16.3 vs lib/deimos/utils/inline_consumer.rb in deimos-ruby-1.16.4

- old
+ new

@@ -4,14 +4,16 @@ # Assumes that you have a topic with only one partition. module Deimos module Utils # Listener that can seek to get the last X messages in a topic. class SeekListener < Phobos::Listener + # @return [Integer] MAX_SEEK_RETRIES = 3 + # @return [Integer] attr_accessor :num_messages - # :nodoc: + # @return [void] def start_listener @num_messages ||= 10 @consumer = create_kafka_consumer @consumer.subscribe(topic, @subscribe_opts) attempt = 0 @@ -43,43 +45,48 @@ class MessageBankHandler < Deimos::Consumer include Phobos::Handler cattr_accessor :total_messages - # @param klass [Class < Deimos::Consumer] + # @param klass [Class<Deimos::Consumer>] + # @return [void] def self.config_class=(klass) self.config.merge!(klass.config) end - # :nodoc: + # @param _kafka_client [Kafka::Client] + # @return [void] def self.start(_kafka_client) self.total_messages = [] end - # :nodoc: + # @param payload [Hash] + # @param metadata [Hash] def consume(payload, metadata) self.class.total_messages << { key: metadata[:key], payload: payload } end end # Class which can process/consume messages inline. class InlineConsumer + # @return [Integer] MAX_MESSAGE_WAIT_TIME = 1.second + # @return [Integer] MAX_TOPIC_WAIT_TIME = 10.seconds # Get the last X messages from a topic. You can specify a subclass of # Deimos::Consumer or Deimos::Producer, or provide the # schema, namespace and key_config directly. # @param topic [String] - # @param config_class [Class < Deimos::Consumer|Deimos::Producer>] + # @param config_class [Class<Deimos::Consumer>,Class<Deimos::Producer>] # @param schema [String] # @param namespace [String] # @param key_config [Hash] - # @param num_messages [Number] + # @param num_messages [Integer] # @return [Array<Hash>] def self.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) if config_class MessageBankHandler.config_class = config_class @@ -104,9 +111,10 @@ # Consume the last X messages from a topic. # @param topic [String] # @param frk_consumer [Class] # @param num_messages [Integer] If this number is >= the number # of messages in the topic, all messages will be consumed. + # @return [void] def self.consume(topic:, frk_consumer:, num_messages: 10) listener = SeekListener.new( handler: frk_consumer, group_id: SecureRandom.hex, topic: topic,