Sha256: d9dcda2158e893c21e3fc3180c5dbbc1e58b6f2c4be31ea25e6081a14a91d5cc

Contents?: true

Size: 1.93 KB

Versions: 18

Compression:

Stored size: 1.93 KB

Contents

# frozen_string_literal: true

# This Karafka component is a Pro component under a commercial license.
# This Karafka component is NOT licensed under LGPL.
#
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
  module Pro
    module Processing
      # Uses the jobs queue API to lock (pause) and unlock (resume) operations of a given
      # subscription group. It is abstracted away from jobs queue on this layer because we do
      # not want to introduce jobs queue as a concept to the consumers layer
      class SubscriptionGroupsCoordinator
        include Singleton

        # @param subscription_group [Karafka::Routing::SubscriptionGroup] subscription group we
        #   want to pause
        # @param lock_id [Object] key we want to use if we want to set multiple locks on the same
        #   subscription group
        # @param kwargs [Object] Any keyword arguments accepted by the jobs queue lock.
        def pause(subscription_group, lock_id = nil, **kwargs)
          jobs_queue.lock_async(
            subscription_group.id,
            lock_id,
            **kwargs
          )
        end

        # @param subscription_group [Karafka::Routing::SubscriptionGroup] subscription group we
        #   want to resume
        # @param lock_id [Object] lock id (if it was used to pause)
        def resume(subscription_group, lock_id = nil)
          jobs_queue.unlock_async(subscription_group.id, lock_id)
        end

        private

        # @return [Karafka::Pro::Processing::JobsQueue]
        def jobs_queue
          @jobs_queue ||= Karafka::Server.jobs_queue
        end
      end
    end
  end
end

Version data entries

18 entries across 18 versions & 1 rubygems

Version Path
karafka-2.4.16 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.15 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.14 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.13 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.12 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.11 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.10 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.9 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.8 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.7 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.6 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.5 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.4 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.3 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.0 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.0.rc1 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.0.beta2 lib/karafka/pro/processing/subscription_groups_coordinator.rb
karafka-2.4.0.beta1 lib/karafka/pro/processing/subscription_groups_coordinator.rb