lib/bbk/amqp/message.rb in bbk-amqp-1.1.0.204853 vs lib/bbk/amqp/message.rb in bbk-amqp-1.1.1.273342

- old
+ new

@@ -1,30 +1,27 @@ # frozen_string_literal: true +require 'bbk/app/dispatcher/message' module BBK module AMQP # Store information about consumed AMQP message - class Message + class Message < BBK::App::Dispatcher::Message - attr_reader :consumer, :headers, :body, :payload, :delivery_info, :properties + attr_reader :properties def initialize(consumer, delivery_info, properties, body) - @consumer = consumer @properties = properties.to_h.with_indifferent_access - @body = body + headers = @properties.except(:headers).merge(@properties[:headers].presence || {}).with_indifferent_access + amqp_consumer = delivery_info[:consumer] - @delivery_info = delivery_info.to_h.merge( + delivery_info = delivery_info.to_h.merge( message_consumer: consumer, protocols: consumer.protocols, queue: amqp_consumer&.queue_name ) - @headers = @properties.except(:headers).merge(properties[:headers].presence || {}).with_indifferent_access - @payload = begin - Oj.load(body).with_indifferent_access - rescue StandardError - {}.with_indifferent_access - end + + super(consumer, delivery_info, headers, body) end def message_id headers[:message_id] end @@ -33,17 +30,9 @@ headers[:reply_to] end def user_id headers[:user_id] - end - - def ack(*args, answer: nil, **kwargs) - consumer.ack(self, *args, answer: answer, **kwargs) - end - - def nack(*args, error: nil, **kwargs) - consumer.nack(self, *args, error: error, **kwargs) end def clone self.class.new(consumer, delivery_info, properties, body) end