Sha256: b4c069c377dc2b295563b039ca1b23444d68cd0d37cecbeed85aa37ec0c8e4f3

Contents?: true

Size: 1.59 KB

Versions: 25

Compression:

Stored size: 1.59 KB

Contents

# frozen_string_literal: true

module Deimos
  # Store Kafka messages into the database.
  class KafkaMessage < ActiveRecord::Base
    self.table_name = 'kafka_messages'

    validates_presence_of :topic

    # Ensure it gets turned into a string, e.g. for testing purposes. It
    # should already be a string.
    # @param mess [Object]
    # @return [void]
    def message=(mess)
      write_attribute(:message, mess ? mess.to_s : nil)
    end

    # Decoded payload for this message.
    # @return [Hash]
    def decoded_message
      self.class.decoded([self]).first
    end

    # Get a decoder to decode a set of messages on the given topic.
    # @param topic [String]
    # @return [Deimos::Consumer]
    def self.decoder(topic)
      producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
      return nil unless producer

      consumer = Class.new(Deimos::Consumer)
      consumer.config.merge!(producer.config)
      consumer
    end

    # Decoded payloads for a list of messages.
    # @param messages [Array<Deimos::KafkaMessage>]
    # @return [Array<Hash>]
    def self.decoded(messages=[])
      return [] if messages.empty?

      decoder = self.decoder(messages.first.topic)&.new
      messages.map do |m|
        {
          key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil,
          payload: decoder&.decoder&.decode(m.message) || m.message
        }
      end
    end

    # @return [Hash]
    def phobos_message
      {
        payload: self.message,
        partition_key: self.partition_key,
        key: self.key,
        topic: self.topic
      }
    end
  end
end

Version data entries

25 entries across 25 versions & 1 rubygems

Version Path
deimos-ruby-1.22.4 lib/deimos/kafka_message.rb
deimos-ruby-1.22.3 lib/deimos/kafka_message.rb
deimos-ruby-1.22.2 lib/deimos/kafka_message.rb
deimos-ruby-1.22.1 lib/deimos/kafka_message.rb
deimos-ruby-1.22 lib/deimos/kafka_message.rb
deimos-ruby-1.20.1 lib/deimos/kafka_message.rb
deimos-ruby-1.20.0 lib/deimos/kafka_message.rb
deimos-ruby-1.19.7 lib/deimos/kafka_message.rb
deimos-ruby-1.19.6 lib/deimos/kafka_message.rb
deimos-ruby-1.19.5 lib/deimos/kafka_message.rb
deimos-ruby-1.19.4 lib/deimos/kafka_message.rb
deimos-ruby-1.19.3 lib/deimos/kafka_message.rb
deimos-ruby-1.19.2 lib/deimos/kafka_message.rb
deimos-ruby-1.19.1 lib/deimos/kafka_message.rb
deimos-ruby-1.19.1.pre.beta1 lib/deimos/kafka_message.rb
deimos-ruby-1.19.0 lib/deimos/kafka_message.rb
deimos-ruby-1.19.beta2 lib/deimos/kafka_message.rb
deimos-ruby-1.19.beta1 lib/deimos/kafka_message.rb
deimos-ruby-1.18.2 lib/deimos/kafka_message.rb
deimos-ruby-1.18.1 lib/deimos/kafka_message.rb