Sha256: b51cc3545cbee56bee4a87064388341d2dd1e5a566088b256267d15da879ebac

Contents?: true

Size: 1.27 KB

Versions: 20

Compression:

Stored size: 1.27 KB

Contents

# frozen_string_literal: true

require 'deimos/kafka_message'

module Deimos
  module Backends
    # Backend which saves messages to the database instead of immediately
    # sending them.
    class Outbox < Base
      class << self
        # :nodoc:
        def execute(producer_class:, messages:)
          records = messages.map do |m|
            Deimos::ProducerMiddleware.call(m)
            message = Deimos::KafkaMessage.new(
              message: m[:payload] ? m[:payload].to_s.b : nil,
              topic: m[:topic],
              partition_key: partition_key_for(m)
            )
            message.key = m[:key].to_s.b if m[:key]
            message
          end
          Deimos::KafkaMessage.import(records)
          Deimos.config.metrics&.increment(
            'outbox.insert',
            tags: %W(topic:#{producer_class.topic}),
            by: records.size
          )
        end

        # @param message [Deimos::Message]
        # @return [String] the partition key to use for this message
        def partition_key_for(message)
          if message[:partition_key].present?
            message[:partition_key]
          elsif message[:key].present?
            message[:key].to_s.b
          else
            nil
          end
        end
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
deimos-ruby-2.0.5 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.4 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.3 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.2 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.1 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta7 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta6 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta5 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta4 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta3 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta2 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.beta1 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha7 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha6 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha5 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha4 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha3 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha2 lib/deimos/backends/outbox.rb
deimos-ruby-2.0.0.pre.alpha1 lib/deimos/backends/outbox.rb