Sha256: c571dc7f388d5ee52f0ee9be55f66d34aeca1cef0e9e2777b63712d2efb82801

Contents?: true

Size: 1.98 KB

Versions: 52

Compression:

Stored size: 1.98 KB

Contents

# frozen_string_literal: true

module Karafka
  module ActiveJob
    # Dispatcher that sends the ActiveJob job to a proper topic based on the queue name
    class Dispatcher
      # Defaults for dispatching
      # The can be updated by using `#karafka_options` on the job
      DEFAULTS = {
        dispatch_method: :produce_async,
        dispatch_many_method: :produce_many_async
      }.freeze

      private_constant :DEFAULTS

      # @param job [ActiveJob::Base] job
      def dispatch(job)
        ::Karafka.producer.public_send(
          fetch_option(job, :dispatch_method, DEFAULTS),
          topic: job.queue_name,
          payload: ::ActiveSupport::JSON.encode(serialize_job(job))
        )
      end

      # Bulk dispatches multiple jobs using the Rails 7.1+ API
      # @param jobs [Array<ActiveJob::Base>] jobs we want to dispatch
      def dispatch_many(jobs)
        # Group jobs by their desired dispatch method
        # It can be configured per job class, so we need to make sure we divide them
        dispatches = Hash.new { |hash, key| hash[key] = [] }

        jobs.each do |job|
          d_method = fetch_option(job, :dispatch_many_method, DEFAULTS)

          dispatches[d_method] << {
            topic: job.queue_name,
            payload: ::ActiveSupport::JSON.encode(serialize_job(job))
          }
        end

        dispatches.each do |type, messages|
          ::Karafka.producer.public_send(
            type,
            messages
          )
        end
      end

      private

      # @param job [ActiveJob::Base] job
      # @param key [Symbol] key we want to fetch
      # @param defaults [Hash]
      # @return [Object] options we are interested in
      def fetch_option(job, key, defaults)
        job
          .class
          .karafka_options
          .fetch(key, defaults.fetch(key))
      end

      # @param job [ActiveJob::Base] job
      # @return [Hash] json representation of the job
      def serialize_job(job)
        job.serialize
      end
    end
  end
end

Version data entries

52 entries across 52 versions & 1 rubygems

Version Path
karafka-2.4.11 lib/karafka/active_job/dispatcher.rb
karafka-2.4.10 lib/karafka/active_job/dispatcher.rb
karafka-2.4.9 lib/karafka/active_job/dispatcher.rb
karafka-2.4.8 lib/karafka/active_job/dispatcher.rb
karafka-2.4.7 lib/karafka/active_job/dispatcher.rb
karafka-2.4.6 lib/karafka/active_job/dispatcher.rb
karafka-2.4.5 lib/karafka/active_job/dispatcher.rb
karafka-2.4.4 lib/karafka/active_job/dispatcher.rb
karafka-2.4.3 lib/karafka/active_job/dispatcher.rb
karafka-2.4.0 lib/karafka/active_job/dispatcher.rb
karafka-2.4.0.rc1 lib/karafka/active_job/dispatcher.rb
karafka-2.3.4 lib/karafka/active_job/dispatcher.rb
karafka-2.4.0.beta2 lib/karafka/active_job/dispatcher.rb
karafka-2.4.0.beta1 lib/karafka/active_job/dispatcher.rb
karafka-2.3.3 lib/karafka/active_job/dispatcher.rb
karafka-2.3.2 lib/karafka/active_job/dispatcher.rb
karafka-2.3.1 lib/karafka/active_job/dispatcher.rb
karafka-2.3.0 lib/karafka/active_job/dispatcher.rb
karafka-2.3.0.rc1 lib/karafka/active_job/dispatcher.rb
karafka-2.3.0.alpha2 lib/karafka/active_job/dispatcher.rb