Sha256: 12c9e027ef716501f820df9c9c6795c848ac27134d33f2753a816c3b316e8566

Contents?: true

Size: 1.63 KB

Versions: 13

Compression:

Stored size: 1.63 KB

Contents

# frozen_string_literal: true

module Deimos
  # Store Kafka messages into the database.
  class KafkaMessage < ActiveRecord::Base
    self.table_name = 'kafka_messages'

    validates_presence_of :topic

    # Ensure it gets turned into a string, e.g. for testing purposes. It
    # should already be a string.
    # @param mess [Object]
    # @return [void]
    def message=(mess)
      write_attribute(:message, mess ? mess.to_s : nil)
    end

    # Decoded payload for this message.
    # @return [Hash]
    def decoded_message
      self.class.decoded([self]).first
    end

    # Get a decoder to decode a set of messages on the given topic.
    # @param topic [String]
    # @return [Deimos::Consumer]
    def self.decoder(topic)
      producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
      return nil unless producer

      consumer = Class.new(Deimos::Consumer)
      consumer.config.merge!(producer.config)
      consumer
    end

    # Decoded payloads for a list of messages.
    # @param messages [Array<Deimos::KafkaMessage>]
    # @return [Array<Hash>]
    def self.decoded(messages=[])
      return [] if messages.empty?

      decoder_class = self.decoder(messages.first.topic)
      decoder = decoder_class&.new
      messages.map do |m|
        {
          key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil,
          payload: decoder_class&.decoder&.decode(m.message) || m.message
        }
      end
    end

    # @return [Hash]
    def phobos_message
      {
        payload: self.message,
        partition_key: self.partition_key,
        key: self.key,
        topic: self.topic
      }
    end
  end
end

Version data entries

13 entries across 13 versions & 1 rubygems

Version Path
deimos-ruby-1.24.2 lib/deimos/kafka_message.rb
deimos-ruby-1.24.1 lib/deimos/kafka_message.rb
deimos-ruby-1.24.0 lib/deimos/kafka_message.rb
deimos-ruby-1.23.3 lib/deimos/kafka_message.rb
deimos-ruby-1.23.2 lib/deimos/kafka_message.rb
deimos-ruby-1.23.1.pre.beta6 lib/deimos/kafka_message.rb
deimos-ruby-1.23.1.pre.beta5 lib/deimos/kafka_message.rb
deimos-ruby-1.23.1.pre.beta4 lib/deimos/kafka_message.rb
deimos-ruby-1.23.1.pre.beta3 lib/deimos/kafka_message.rb
deimos-ruby-1.23.1.pre.beta2 lib/deimos/kafka_message.rb
deimos-ruby-1.23.1.pre.beta1 lib/deimos/kafka_message.rb
deimos-ruby-1.23.0 lib/deimos/kafka_message.rb
deimos-ruby-1.22.5 lib/deimos/kafka_message.rb