Sha256: af9e2aae0c25fbfdf6e139bbcfbf0862741cc087a59390b7e29a6f8307fe9df0

Contents?: true

Size: 1.92 KB

Versions: 3

Compression:

Stored size: 1.92 KB

Contents

# frozen_string_literal: true

module Karafka
  # Interchanger allows us to format/encode/pack data that is being send to perform_async
  # This is meant to target mostly issues with data encoding like this one:
  # https://github.com/mperham/sidekiq/issues/197
  # Each custom interchanger should implement following methods:
  #   - encode - it is meant to encode params before they get stored inside Redis
  #   - decode - decoded params back to a hash format that we can use
  #
  # This interchanger uses default Sidekiq options to exchange data
  class Interchanger
    # @param params_batch [Karafka::Params::ParamsBatch] Karafka params batch object
    # @return [Array<Hash>] Array with hash built out of params data
    def encode(params_batch)
      params_batch.map do |param|
        metadata_hash = param.metadata.to_h
        # All the metadata must have stringified keys in order to safe serialize
        metadata_hash.transform_keys!(&:to_s)
        # This will be taken back from the routing and is not safe for serialization
        metadata_hash.delete('deserializer')

        # Cast times to strings, we will de-serialize it back in Sidekiq
        metadata_hash['receive_time'] = metadata_hash['receive_time'].to_f
        metadata_hash['create_time'] = metadata_hash['create_time'].to_f

        {
          'raw_payload' => param.raw_payload,
          'metadata' => metadata_hash
        }
      end
    end

    # @param params_batch [Array<Hash>] Sidekiq params that are now an array
    # @return [Array<Hash>] exactly what we've fetched from Sidekiq
    def decode(params_batch)
      params_batch.map do |param|
        metadata = param['metadata']
        # Covert serialized dates back to what they were
        metadata['receive_time'] = Time.at(metadata['receive_time']).to_time
        metadata['create_time'] = Time.at(metadata['create_time']).to_time

        param['metadata'] = metadata

        param
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
karafka-sidekiq-backend-1.4.6 lib/karafka/interchanger.rb
karafka-sidekiq-backend-1.4.5 lib/karafka/interchanger.rb
karafka-sidekiq-backend-1.4.4 lib/karafka/interchanger.rb