Sha256: f7e9594bac1bbd8d41306e4b5e0ea006be3025412b92b190ab9d568f145700f1
Contents?: true
Size: 1.79 KB
Versions: 22
Compression:
Stored size: 1.79 KB
Contents
# frozen_string_literal: true module Karafka module Routing # Object representing a set of single consumer group topics that can be subscribed together # with one connection. # # @note One subscription group will always belong to one consumer group, but one consumer # group can have multiple subscription groups. class SubscriptionGroup attr_reader :id, :topics # @param topics [Karafka::Routing::Topics] all the topics that share the same key settings # @return [SubscriptionGroup] built subscription group def initialize(topics) @id = SecureRandom.uuid @topics = topics freeze end # @return [String] consumer group id def consumer_group_id kafka[:'group.id'] end # @return [Integer] max messages fetched in a single go def max_messages @topics.first.max_messages end # @return [Integer] max milliseconds we can wait for incoming messages def max_wait_time @topics.first.max_wait_time end # @return [Hash] kafka settings are a bit special. They are exactly the same for all of the # topics but they lack the group.id (unless explicitly) provided. To make it compatible # with our routing engine, we inject it before it will go to the consumer def kafka kafka = @topics.first.kafka.dup kafka[:'client.id'] ||= Karafka::App.config.client_id kafka[:'group.id'] ||= @topics.first.consumer_group.id kafka[:'auto.offset.reset'] ||= @topics.first.initial_offset # Karafka manages the offsets based on the processing state, thus we do not rely on the # rdkafka offset auto-storing kafka[:'enable.auto.offset.store'] = false kafka.freeze kafka end end end end
Version data entries
22 entries across 22 versions & 1 rubygems
Version | Path |
---|---|
karafka-2.0.0.beta4 | lib/karafka/routing/subscription_group.rb |
karafka-2.0.0.beta3 | lib/karafka/routing/subscription_group.rb |