Sha256: 2c32b7aea82333d0ca2b1fd5405c81fa4fe14998b2724056d085e209fab53878

Contents?: true

Size: 1.57 KB

Versions: 59

Compression:

Stored size: 1.57 KB

Contents

# frozen_string_literal: true

module Deimos
  # Store Kafka messages into the database.
  class KafkaMessage < ActiveRecord::Base
    self.table_name = 'kafka_messages'

    validates_presence_of :topic

    # Ensure it gets turned into a string, e.g. for testing purposes. It
    # should already be a string.
    # @param mess [Object]
    def message=(mess)
      write_attribute(:message, mess ? mess.to_s : nil)
    end

    # Decoded payload for this message.
    # @return [Hash]
    def decoded_message
      self.class.decoded([self]).first
    end

    # Get a decoder to decode a set of messages on the given topic.
    # @param topic [String]
    # @return [Deimos::Consumer]
    def self.decoder(topic)
      producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
      return nil unless producer

      consumer = Class.new(Deimos::Consumer)
      consumer.config.merge!(producer.config)
      consumer
    end

    # Decoded payloads for a list of messages.
    # @param messages [Array<Deimos::KafkaMessage>]
    # @return [Array<Hash>]
    def self.decoded(messages=[])
      return [] if messages.empty?

      decoder = self.decoder(messages.first.topic)&.new
      messages.map do |m|
        {
          key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil,
          payload: decoder&.decoder&.decode(m.message) || m.message
        }
      end
    end

    # @return [Hash]
    def phobos_message
      {
        payload: self.message,
        partition_key: self.partition_key,
        key: self.key,
        topic: self.topic
      }
    end
  end
end

Version data entries

59 entries across 59 versions & 2 rubygems

Version Path
deimos-ruby-1.16.3 lib/deimos/kafka_message.rb
deimos-ruby-1.16.2 lib/deimos/kafka_message.rb
deimos-ruby-1.16.1 lib/deimos/kafka_message.rb
deimos-ruby-1.16.0 lib/deimos/kafka_message.rb
deimos-ruby-1.15.1 lib/deimos/kafka_message.rb
deimos-ruby-1.15.0 lib/deimos/kafka_message.rb
deimos-ruby-1.14.6 lib/deimos/kafka_message.rb
deimos-ruby-1.14.5 lib/deimos/kafka_message.rb
deimos-ruby-1.14.4 lib/deimos/kafka_message.rb
deimos-ruby-1.14.3 lib/deimos/kafka_message.rb
deimos-ruby-1.14.2 lib/deimos/kafka_message.rb
deimos-ruby-1.14.1 lib/deimos/kafka_message.rb
deimos-ruby-1.14.0 lib/deimos/kafka_message.rb
deimos-ruby-1.13.3 lib/deimos/kafka_message.rb
deimos-ruby-1.13.2 lib/deimos/kafka_message.rb
deimos-ruby-1.13.1 lib/deimos/kafka_message.rb
deimos-ruby-1.13.0 lib/deimos/kafka_message.rb
deimos-ruby-1.12.6 lib/deimos/kafka_message.rb
deimos-ruby-1.12.5 lib/deimos/kafka_message.rb
deimos-ruby-1.12.4 lib/deimos/kafka_message.rb