Sha256: 91f7eb6a3731c51c314408be7982c8eb14d8b719de4b684eb74dd1816e55d40b
Contents?: true
Size: 1.32 KB
Versions: 1
Compression:
Stored size: 1.32 KB
Contents
# frozen_string_literal: true module Karafka module Connection # This object represents a collective status of execution of group of listeners running inside # of one consumer group but potentially in separate subscription groups. # # There are cases when we do not want to close a given client when others from the same # consumer group are running because it can cause instabilities due to early shutdown of some # of the clients out of same consumer group. # # Here we can track it and only shutdown listeners when all work in a group is done. class ConsumerGroupStatus # @param group_size [Integer] number of separate subscription groups in a consumer group def initialize(group_size) @mutex = Mutex.new @active_size = group_size end # @return [Boolean] Are there any listeners that are still doing any type of work. If not, # it means a consumer group is safe to be shutdown fully. def working? @active_size.positive? end # Decrements number of working listeners in the group by one until there's none def finish @mutex.synchronize do @active_size -= 1 return if @active_size >= 0 raise Errors::InvalidConsumerGroupStatusError, @active_size end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
karafka-2.0.18 | lib/karafka/connection/consumer_group_status.rb |