lib/bbk/amqp/message.rb in bbk-amqp-1.1.0.204853 vs lib/bbk/amqp/message.rb in bbk-amqp-1.1.1.273342
- old
+ new
@@ -1,30 +1,27 @@
# frozen_string_literal: true
+require 'bbk/app/dispatcher/message'
module BBK
module AMQP
# Store information about consumed AMQP message
- class Message
+ class Message < BBK::App::Dispatcher::Message
- attr_reader :consumer, :headers, :body, :payload, :delivery_info, :properties
+ attr_reader :properties
def initialize(consumer, delivery_info, properties, body)
- @consumer = consumer
@properties = properties.to_h.with_indifferent_access
- @body = body
+ headers = @properties.except(:headers).merge(@properties[:headers].presence || {}).with_indifferent_access
+
amqp_consumer = delivery_info[:consumer]
- @delivery_info = delivery_info.to_h.merge(
+ delivery_info = delivery_info.to_h.merge(
message_consumer: consumer,
protocols: consumer.protocols,
queue: amqp_consumer&.queue_name
)
- @headers = @properties.except(:headers).merge(properties[:headers].presence || {}).with_indifferent_access
- @payload = begin
- Oj.load(body).with_indifferent_access
- rescue StandardError
- {}.with_indifferent_access
- end
+
+ super(consumer, delivery_info, headers, body)
end
def message_id
headers[:message_id]
end
@@ -33,17 +30,9 @@
headers[:reply_to]
end
def user_id
headers[:user_id]
- end
-
- def ack(*args, answer: nil, **kwargs)
- consumer.ack(self, *args, answer: answer, **kwargs)
- end
-
- def nack(*args, error: nil, **kwargs)
- consumer.nack(self, *args, error: error, **kwargs)
end
def clone
self.class.new(consumer, delivery_info, properties, body)
end