Sha256: 1d7e0ddcee7df199bbea6bb97b1835f161f2e6b4c2b2f50143efcffff0fe690b
Contents?: true
Size: 1.5 KB
Versions: 1
Compression:
Stored size: 1.5 KB
Contents
module NulogyMessageBusConsumer class Message attr_reader :event_data attr_reader :event_data_unparsed attr_reader :id attr_reader :key attr_reader :offset attr_reader :partition attr_reader :subscription_id attr_reader :company_uuid attr_reader :timestamp attr_reader :topic attr_reader :created_at def initialize(attrs = {}) attrs.each { |key, value| instance_variable_set("@#{key}", value) } end def self.from_kafka(kafka_message) envelope_data = JSON.parse(kafka_message.payload, symbolize_names: true) event_data = begin JSON.parse(envelope_data[:event_json], symbolize_names: true) rescue {} end new( event_data: event_data, event_data_unparsed: envelope_data[:event_json], id: envelope_data[:id], key: kafka_message.key, offset: kafka_message.offset, partition: kafka_message.partition, subscription_id: envelope_data[:subscription_id] || envelope_data[:public_subscription_id], company_uuid: envelope_data[:company_uuid] || envelope_data[:tenant_id], timestamp: kafka_message.timestamp, topic: kafka_message.topic, created_at: envelope_data[:created_at] ) end def to_h instance_variables.each_with_object({}) do |instance_variable, hash| attribute_name = instance_variable.to_s.delete("@") hash[attribute_name.to_sym] = public_send(attribute_name) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nulogy_message_bus_consumer-2.0.1 | lib/nulogy_message_bus_consumer/message.rb |