Sha256: 800d981f0a2431d7276305e144d381dde693459c4d5b6413b880dddf8c1e9aa9
Contents?: true
Size: 902 Bytes
Versions: 67
Compression:
Stored size: 902 Bytes
Contents
# frozen_string_literal: true module Deimos module Backends # Default backend to produce to Kafka. class Kafka < Base include Phobos::Producer # Shut down the producer if necessary. def self.shutdown_producer producer.sync_producer_shutdown if producer.respond_to?(:sync_producer_shutdown) producer.kafka_client&.close end # :nodoc: def self.execute(producer_class:, messages:) Deimos.instrument( 'produce', producer: producer_class, topic: producer_class.topic, payloads: messages.map(&:payload) ) do producer.publish_list(messages.map(&:encoded_hash)) Deimos.config.metrics&.increment( 'publish', tags: %W(status:success topic:#{producer_class.topic}), by: messages.size ) end end end end end
Version data entries
67 entries across 67 versions & 2 rubygems