Sha256: c571dc7f388d5ee52f0ee9be55f66d34aeca1cef0e9e2777b63712d2efb82801
Contents?: true
Size: 1.98 KB
Versions: 52
Compression:
Stored size: 1.98 KB
Contents
# frozen_string_literal: true module Karafka module ActiveJob # Dispatcher that sends the ActiveJob job to a proper topic based on the queue name class Dispatcher # Defaults for dispatching # The can be updated by using `#karafka_options` on the job DEFAULTS = { dispatch_method: :produce_async, dispatch_many_method: :produce_many_async }.freeze private_constant :DEFAULTS # @param job [ActiveJob::Base] job def dispatch(job) ::Karafka.producer.public_send( fetch_option(job, :dispatch_method, DEFAULTS), topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) ) end # Bulk dispatches multiple jobs using the Rails 7.1+ API # @param jobs [Array<ActiveJob::Base>] jobs we want to dispatch def dispatch_many(jobs) # Group jobs by their desired dispatch method # It can be configured per job class, so we need to make sure we divide them dispatches = Hash.new { |hash, key| hash[key] = [] } jobs.each do |job| d_method = fetch_option(job, :dispatch_many_method, DEFAULTS) dispatches[d_method] << { topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) } end dispatches.each do |type, messages| ::Karafka.producer.public_send( type, messages ) end end private # @param job [ActiveJob::Base] job # @param key [Symbol] key we want to fetch # @param defaults [Hash] # @return [Object] options we are interested in def fetch_option(job, key, defaults) job .class .karafka_options .fetch(key, defaults.fetch(key)) end # @param job [ActiveJob::Base] job # @return [Hash] json representation of the job def serialize_job(job) job.serialize end end end end
Version data entries
52 entries across 52 versions & 1 rubygems