Sha256: 97ef31e5b0aac5cc0bd7d987fc33c1d95609ded7b820cd0db96089d02d3c5cac

Contents?: true

Size: 1.81 KB

Versions: 9

Compression:

Stored size: 1.81 KB

Contents

# frozen_string_literal: true

module Karafka
  module ActiveJob
    # Dispatcher that sends the ActiveJob job to a proper topic based on the queue name
    class Dispatcher
      # Defaults for dispatching
      # The can be updated by using `#karafka_options` on the job
      DEFAULTS = {
        dispatch_method: :produce_async,
        dispatch_many_method: :produce_many_async
      }.freeze

      private_constant :DEFAULTS

      # @param job [ActiveJob::Base] job
      def dispatch(job)
        ::Karafka.producer.public_send(
          fetch_option(job, :dispatch_method, DEFAULTS),
          topic: job.queue_name,
          payload: ::ActiveSupport::JSON.encode(job.serialize)
        )
      end

      # Bulk dispatches multiple jobs using the Rails 7.1+ API
      # @param jobs [Array<ActiveJob::Base>] jobs we want to dispatch
      def dispatch_many(jobs)
        # Group jobs by their desired dispatch method
        # It can be configured per job class, so we need to make sure we divide them
        dispatches = Hash.new { |hash, key| hash[key] = [] }

        jobs.each do |job|
          d_method = fetch_option(job, :dispatch_many_method, DEFAULTS)

          dispatches[d_method] << {
            topic: job.queue_name,
            payload: ::ActiveSupport::JSON.encode(job.serialize)
          }
        end

        dispatches.each do |type, messages|
          ::Karafka.producer.public_send(
            type,
            messages
          )
        end
      end

      private

      # @param job [ActiveJob::Base] job
      # @param key [Symbol] key we want to fetch
      # @param defaults [Hash]
      # @return [Object] options we are interested in
      def fetch_option(job, key, defaults)
        job
          .class
          .karafka_options
          .fetch(key, defaults.fetch(key))
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
karafka-2.0.41 lib/karafka/active_job/dispatcher.rb
karafka-2.0.40 lib/karafka/active_job/dispatcher.rb
karafka-2.0.39 lib/karafka/active_job/dispatcher.rb
karafka-2.0.38 lib/karafka/active_job/dispatcher.rb
karafka-2.0.37 lib/karafka/active_job/dispatcher.rb
karafka-2.0.36 lib/karafka/active_job/dispatcher.rb
karafka-2.0.35 lib/karafka/active_job/dispatcher.rb
karafka-2.0.34 lib/karafka/active_job/dispatcher.rb
karafka-2.0.33 lib/karafka/active_job/dispatcher.rb