Sha256: 7d647dfb82eb4e7cf57b27a546d38d795fd5a6908206ff03d4601adebb8d5279

Contents?: true

Size: 1.33 KB

Versions: 4

Compression:

Stored size: 1.33 KB

Contents

# frozen_string_literal: true

module Deimos
  # Store Kafka messages into the database.
  class KafkaMessage < ActiveRecord::Base
    self.table_name = 'kafka_messages'

    validates_presence_of :topic

    # Ensure it gets turned into a string, e.g. for testing purposes. It
    # should already be a string.
    # @param mess [Object]
    def message=(mess)
      write_attribute(:message, mess ? mess.to_s : nil)
    end

    # @return [Deimos::Consumer]
    def decoder
      producer = Deimos::Producer.descendants.find { |c| c.topic == self.topic }
      return nil unless producer

      consumer = Class.new(Deimos::Consumer)
      consumer.config.merge!(producer.config)
      consumer
    end

    # Decode the message. This assumes for now that we have access to a producer
    # in the codebase which can decode it.
    # @param decoder [Deimos::Consumer]
    # @return [Hash]
    def decoded_message(decoder=self.decoder)
      return { key: self.key, message: self.message } unless decoder

      {
        key: self.key.present? ? decoder.new.decode_key(self.key) : nil,
        payload: decoder.decoder.decode(self.message)
      }
    end

    # @return [Hash]
    def phobos_message
      {
        payload: self.message,
        partition_key: self.partition_key,
        key: self.key,
        topic: self.topic
      }
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
deimos-ruby-1.1.0.pre.beta2 lib/deimos/kafka_message.rb
deimos-ruby-1.1.0.pre.beta1 lib/deimos/kafka_message.rb
deimos-ruby-1.0.0 lib/deimos/kafka_message.rb
deimos-ruby-1.0.0.pre.beta26 lib/deimos/kafka_message.rb