Sha256: e75bb8c37e6838cc01f81d68069d60e414e0454d813d89dc6fe3f47f02879345

Contents?: true

Size: 1.97 KB

Versions: 5

Compression:

Stored size: 1.97 KB

Contents

# frozen_string_literal: true

# This Karafka component is a Pro component.
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
  module Pro
    # Karafka Pro ActiveJob components
    module ActiveJob
      # Pro dispatcher that sends the ActiveJob job to a proper topic based on the queue name
      # and that allows to inject additional options into the producer, effectively allowing for a
      # much better and more granular control over the dispatch and consumption process.
      class Dispatcher < ::Karafka::ActiveJob::Dispatcher
        # Defaults for dispatching
        # They can be updated by using `#karafka_options` on the job
        DEFAULTS = {
          dispatch_method: :produce_async,
          # We don't create a dummy proc based partitioner as we would have to evaluate it with
          # each job.
          partitioner: nil
        }.freeze

        private_constant :DEFAULTS

        # @param job [ActiveJob::Base] job
        def call(job)
          ::Karafka.producer.public_send(
            fetch_option(job, :dispatch_method, DEFAULTS),
            dispatch_details(job).merge!(
              topic: job.queue_name,
              payload: ::ActiveSupport::JSON.encode(job.serialize)
            )
          )
        end

        private

        # @param job [ActiveJob::Base] job instance
        # @return [Hash] hash with dispatch details to which we merge topic and payload
        def dispatch_details(job)
          partitioner = fetch_option(job, :partitioner, DEFAULTS)

          return {} unless partitioner

          {
            partition_key: partitioner.call(job)
          }
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
karafka-2.0.0.rc2 lib/karafka/pro/active_job/dispatcher.rb
karafka-2.0.0.rc1 lib/karafka/pro/active_job/dispatcher.rb
karafka-2.0.0.beta5 lib/karafka/pro/active_job/dispatcher.rb
karafka-2.0.0.beta4 lib/karafka/pro/active_job/dispatcher.rb
karafka-2.0.0.beta3 lib/karafka/pro/active_job/dispatcher.rb