Sha256: af9e2514bc44fdb2d5f4cbb22e167b97bbd5827e5e529d0012b877dad47b0e5d
Contents?: true
Size: 1.94 KB
Versions: 2
Compression:
Stored size: 1.94 KB
Contents
# frozen_string_literal: true module Karafka # Interchanger allows us to format/encode/pack data that is being send to perform_async # This is meant to target mostly issues with data encoding like this one: # https://github.com/mperham/sidekiq/issues/197 # Each custom interchanger should implement following methods: # - encode - it is meant to encode params before they get stored inside Redis # - decode - decoded params back to a hash format that we can use # # This interchanger uses default Sidekiq options to exchange data class Interchanger # @param params_batch [Karafka::Params::ParamsBatch] Karafka params batch object # @return [Array<Hash>] Array with hash built out of params data def encode(params_batch) params_batch.map do |param| metadata_hash = param.metadata.to_h # All the metadata must have stringified keys in order to safe serialize metadata_hash.transform_keys!(&:to_s) # This will be taken back from the routing and is not safe for serialization metadata_hash.delete('deserializer') # Cast times to strings, we will de-serialize it back in Sidekiq metadata_hash['receive_time'] = metadata_hash['receive_time'].to_f.to_s metadata_hash['create_time'] = metadata_hash['create_time'].to_f.to_s { 'raw_payload' => param.raw_payload, 'metadata' => metadata_hash } end end # @param params_batch [Array<Hash>] Sidekiq params that are now an array # @return [Array<Hash>] exactly what we've fetched from Sidekiq def decode(params_batch) params_batch.map do |param| metadata = param['metadata'] # Covert serialized dates back to what they were metadata['receive_time'] = Time.at(metadata['receive_time'].to_f).to_time metadata['create_time'] = Time.at(metadata['create_time'].to_f).to_time param['metadata'] = metadata param end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
karafka-sidekiq-backend-1.4.9 | lib/karafka/interchanger.rb |
karafka-sidekiq-backend-1.4.7 | lib/karafka/interchanger.rb |