Sha256: 0765fc60b24df9f4611a92d71ea8e89ff9945c8830b1687090073f41c4500429

Contents?: true

Size: 1.39 KB

Versions: 26

Compression:

Stored size: 1.39 KB

Contents

# frozen_string_literal: true

module Karafka
  module Connection
    # Class that delegates processing of a single received message for which we listen to
    # a proper processor
    module MessageDelegator
      class << self
        # Delegates message (does something with it)
        # It will either schedule or run a proper processor action for the incoming message
        # @param group_id [String] group_id of a group from which a given message came
        # @param kafka_message [<Kafka::FetchedMessage>] raw message from kafka
        # @note This should be looped to obtain a constant delegating of new messages
        def call(group_id, kafka_message)
          topic = Persistence::Topics.fetch(group_id, kafka_message.topic)
          consumer = Persistence::Consumers.fetch(topic, kafka_message.partition)

          Karafka.monitor.instrument(
            'connection.message_delegator.call',
            caller: self,
            consumer: consumer,
            kafka_message: kafka_message
          ) do
            # @note We always get a single message within single delegator, which means that
            # we don't care if user marked it as a batch consumed or not.
            consumer.params_batch = Params::Builders::ParamsBatch.from_kafka_messages(
              [kafka_message],
              topic
            )
            consumer.call
          end
        end
      end
    end
  end
end

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
karafka-1.4.15 lib/karafka/connection/message_delegator.rb
karafka-1.4.14 lib/karafka/connection/message_delegator.rb
karafka-1.4.13 lib/karafka/connection/message_delegator.rb
karafka-1.4.12 lib/karafka/connection/message_delegator.rb
karafka-1.4.11 lib/karafka/connection/message_delegator.rb
karafka-1.4.10 lib/karafka/connection/message_delegator.rb
karafka-1.4.9 lib/karafka/connection/message_delegator.rb
karafka-1.4.8 lib/karafka/connection/message_delegator.rb
karafka-1.4.7 lib/karafka/connection/message_delegator.rb
karafka-1.4.6 lib/karafka/connection/message_delegator.rb
karafka-1.4.5 lib/karafka/connection/message_delegator.rb
karafka-1.4.4 lib/karafka/connection/message_delegator.rb
karafka-1.4.3 lib/karafka/connection/message_delegator.rb
karafka-1.4.2 lib/karafka/connection/message_delegator.rb
karafka-1.4.1 lib/karafka/connection/message_delegator.rb
karafka-1.4.0 lib/karafka/connection/message_delegator.rb
karafka-1.4.0.rc2 lib/karafka/connection/message_delegator.rb
karafka-1.4.0.rc1 lib/karafka/connection/message_delegator.rb
karafka-1.3.7 lib/karafka/connection/message_delegator.rb
karafka-1.3.6 lib/karafka/connection/message_delegator.rb