Sha256: 0765fc60b24df9f4611a92d71ea8e89ff9945c8830b1687090073f41c4500429
Contents?: true
Size: 1.39 KB
Versions: 26
Compression:
Stored size: 1.39 KB
Contents
# frozen_string_literal: true module Karafka module Connection # Class that delegates processing of a single received message for which we listen to # a proper processor module MessageDelegator class << self # Delegates message (does something with it) # It will either schedule or run a proper processor action for the incoming message # @param group_id [String] group_id of a group from which a given message came # @param kafka_message [<Kafka::FetchedMessage>] raw message from kafka # @note This should be looped to obtain a constant delegating of new messages def call(group_id, kafka_message) topic = Persistence::Topics.fetch(group_id, kafka_message.topic) consumer = Persistence::Consumers.fetch(topic, kafka_message.partition) Karafka.monitor.instrument( 'connection.message_delegator.call', caller: self, consumer: consumer, kafka_message: kafka_message ) do # @note We always get a single message within single delegator, which means that # we don't care if user marked it as a batch consumed or not. consumer.params_batch = Params::Builders::ParamsBatch.from_kafka_messages( [kafka_message], topic ) consumer.call end end end end end end
Version data entries
26 entries across 26 versions & 1 rubygems