Sha256: 6fabdcdf22ba5a21c6eef0c0d80073c5295280f81f68fbe3760d4316c4dca589

Contents?: true

Size: 2 KB

Versions: 59

Compression:

Stored size: 2 KB

Contents

# frozen_string_literal: true

module Deimos
  # Basically a struct to hold the message as it's processed.
  class Message
    attr_accessor :payload, :key, :partition_key, :encoded_key,
                  :encoded_payload, :topic, :producer_name

    # @param payload [Hash]
    # @param producer [Class]
    def initialize(payload, producer, topic: nil, key: nil, partition_key: nil)
      @payload = payload&.with_indifferent_access
      @producer_name = producer&.name
      @topic = topic
      @key = key
      @partition_key = partition_key
    end

    # Add message_id and timestamp default values if they are in the
    # schema and don't already have values.
    # @param fields [Array<String>] existing name fields in the schema.
    def add_fields(fields)
      return if @payload.except(:payload_key, :partition_key).blank?

      if fields.include?('message_id')
        @payload['message_id'] ||= SecureRandom.uuid
      end
      if fields.include?('timestamp')
        @payload['timestamp'] ||= Time.now.in_time_zone.to_s
      end
    end

    # @param encoder [Deimos::SchemaBackends::Base]
    def coerce_fields(encoder)
      return if payload.nil?

      @payload = encoder.coerce(@payload)
    end

    # @return [Hash]
    def encoded_hash
      {
        topic: @topic,
        key: @encoded_key,
        partition_key: @partition_key || @encoded_key,
        payload: @encoded_payload,
        metadata: {
          decoded_payload: @payload,
          producer_name: @producer_name
        }
      }
    end

    # @return [Hash]
    def to_h
      {
        topic: @topic,
        key: @key,
        partition_key: @partition_key || @key,
        payload: @payload,
        metadata: {
          decoded_payload: @payload,
          producer_name: @producer_name
        }
      }
    end

    # @param other [Message]
    # @return [Boolean]
    def ==(other)
      self.to_h == other.to_h
    end

    # @return [Boolean] True if this message is a tombstone
    def tombstone?
      payload.nil?
    end
  end
end

Version data entries

59 entries across 59 versions & 2 rubygems

Version Path
deimos-ruby-1.16.3 lib/deimos/message.rb
deimos-ruby-1.16.2 lib/deimos/message.rb
deimos-ruby-1.16.1 lib/deimos/message.rb
deimos-ruby-1.16.0 lib/deimos/message.rb
deimos-ruby-1.15.1 lib/deimos/message.rb
deimos-ruby-1.15.0 lib/deimos/message.rb
deimos-ruby-1.14.6 lib/deimos/message.rb
deimos-ruby-1.14.5 lib/deimos/message.rb
deimos-ruby-1.14.4 lib/deimos/message.rb
deimos-ruby-1.14.3 lib/deimos/message.rb
deimos-ruby-1.14.2 lib/deimos/message.rb
deimos-ruby-1.14.1 lib/deimos/message.rb
deimos-ruby-1.14.0 lib/deimos/message.rb
deimos-ruby-1.13.3 lib/deimos/message.rb
deimos-ruby-1.13.2 lib/deimos/message.rb
deimos-ruby-1.13.1 lib/deimos/message.rb
deimos-ruby-1.13.0 lib/deimos/message.rb
deimos-ruby-1.12.6 lib/deimos/message.rb
deimos-ruby-1.12.5 lib/deimos/message.rb
deimos-ruby-1.12.4 lib/deimos/message.rb