Sha256: 0ba8d9a110a34f269a59865131e6eb3904db160d441667f911f67aab04d5552a
Contents?: true
Size: 1.23 KB
Versions: 83
Compression:
Stored size: 1.23 KB
Contents
# frozen_string_literal: true require 'deimos/kafka_message' module Deimos module Backends # Backend which saves messages to the database instead of immediately # sending them. class Db < Base class << self # :nodoc: def execute(producer_class:, messages:) records = messages.map do |m| message = Deimos::KafkaMessage.new( message: m.encoded_payload ? m.encoded_payload.to_s.b : nil, topic: m.topic, partition_key: partition_key_for(m) ) message.key = m.encoded_key.to_s.b unless producer_class.config[:no_keys] message end Deimos::KafkaMessage.import(records) Deimos.config.metrics&.increment( 'db_producer.insert', tags: %W(topic:#{producer_class.topic}), by: records.size ) end # @param message [Deimos::Message] # @return [String] the partition key to use for this message def partition_key_for(message) return message.partition_key if message.partition_key.present? return message.key unless message.key.is_a?(Hash) message.key.to_yaml end end end end end
Version data entries
83 entries across 83 versions & 2 rubygems