Sha256: 1d7e0ddcee7df199bbea6bb97b1835f161f2e6b4c2b2f50143efcffff0fe690b

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

module NulogyMessageBusConsumer
  class Message
    attr_reader :event_data
    attr_reader :event_data_unparsed
    attr_reader :id
    attr_reader :key
    attr_reader :offset
    attr_reader :partition
    attr_reader :subscription_id
    attr_reader :company_uuid
    attr_reader :timestamp
    attr_reader :topic
    attr_reader :created_at

    def initialize(attrs = {})
      attrs.each { |key, value| instance_variable_set("@#{key}", value) }
    end

    def self.from_kafka(kafka_message)
      envelope_data = JSON.parse(kafka_message.payload, symbolize_names: true)
      event_data =
        begin
          JSON.parse(envelope_data[:event_json], symbolize_names: true)
        rescue
          {}
        end

      new(
        event_data: event_data,
        event_data_unparsed: envelope_data[:event_json],
        id: envelope_data[:id],
        key: kafka_message.key,
        offset: kafka_message.offset,
        partition: kafka_message.partition,
        subscription_id: envelope_data[:subscription_id] || envelope_data[:public_subscription_id],
        company_uuid: envelope_data[:company_uuid] || envelope_data[:tenant_id],
        timestamp: kafka_message.timestamp,
        topic: kafka_message.topic,
        created_at: envelope_data[:created_at]
      )
    end

    def to_h
      instance_variables.each_with_object({}) do |instance_variable, hash|
        attribute_name = instance_variable.to_s.delete("@")
        hash[attribute_name.to_sym] = public_send(attribute_name)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/message.rb