Sha256: 28007945da1303ee58a74096ce5c02e8b179ad4fc7439e76d540c8b9de016c19

Contents?: true

Size: 1.54 KB

Versions: 1

Compression:

Stored size: 1.54 KB

Contents

# frozen_string_literal: true

module Karafka
  # Namespace for alternative processing backends for Karafka framework
  module Backends
    # Sidekiq backend that schedules stuff to Sidekiq worker for delayed execution
    module Sidekiq
      # Karafka Sidekiq backend version
      VERSION = '1.4.6'

      # Enqueues the execution of perform method into a worker.
      # @note Each worker needs to have a class #perform_async method that will allow us to pass
      #   parameters into it. We always pass topic as a first argument and this request
      #   params_batch as a second one (we pass topic to be able to build back the consumer
      #   in the worker)
      def process
        Karafka.monitor.instrument('backends.sidekiq.process', caller: self) do
          # We add batch metadata only for batch worker
          batch_metadata_hash = if respond_to?(:batch_metadata)
                                  # We remove deserializer as it's not safe to convert it to json
                                  # and we can rebuild it anyhow based on the routing data in the
                                  # worker
                                  batch_metadata.to_h
                                                .transform_keys(&:to_s)
                                                .tap { |h| h.delete('deserializer') }
                                end

          topic.worker.perform_async(
            topic.id,
            topic.interchanger.encode(params_batch),
            batch_metadata_hash
          )
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-sidekiq-backend-1.4.6 lib/karafka/backends/sidekiq.rb