# frozen_string_literal: true module RailwayIpc class SingletonPublisher < Sneakers::Publisher include ::Singleton def self.exchange(exchange) @exchange_name = exchange end def self.exchange_name raise 'Subclass must set the exchange' unless @exchange_name @exchange_name end def initialize super(exchange: self.class.exchange_name, exchange_type: :fanout) end def publish(message, published_message_store=RailwayIpc::PublishedMessage) RailwayIpc.logger.warn('DEPRECATED: Use new PublisherInstance class', log_message_options) ensure_message_uuid(message) ensure_correlation_id(message) RailwayIpc.logger.info('Publishing message', log_message_options(message)) result = super(RailwayIpc::Rabbitmq::Payload.encode(message)) published_message_store.store_message(self.class.exchange_name, message) result rescue RailwayIpc::InvalidProtobuf => e RailwayIpc.logger.error('Invalid protobuf', log_message_options(message)) raise e end private def ensure_message_uuid(message) message.uuid = SecureRandom.uuid if message.uuid.blank? message end def ensure_correlation_id(message) message.correlation_id = SecureRandom.uuid if message.correlation_id.blank? message end def log_message_options(message=nil) options = { feature: 'railway_ipc_publisher', exchange: self.class.exchange_name } message.nil? ? options : options.merge(protobuf: { type: message.class, data: message }) end end end module RailwayIpc class Publisher attr_reader :exchange_name, :message_store def initialize(opts={}) @exchange_name = opts.fetch(:exchange_name) @message_store = opts.fetch(:message_store, RailwayIpc::PublishedMessage) end # rubocop:disable Metrics/AbcSize def publish(message) message.uuid = SecureRandom.uuid if message.uuid.blank? message.correlation_id = SecureRandom.uuid if message.correlation_id.blank? RailwayIpc.logger.info('Publishing message', log_message_options(message)) stored_message = message_store.store_message(exchange_name, message) exchange.publish(RailwayIpc::Rabbitmq::Payload.encode(message)) rescue RailwayIpc::InvalidProtobuf => e RailwayIpc.logger.error('Invalid protobuf', log_message_options(message)) raise e rescue ActiveRecord::RecordInvalid => e RailwayIpc.logger.error('Failed to store outgoing message', log_message_options(message)) raise RailwayIpc::FailedToStoreOutgoingMessage.new(e) rescue StandardError => e stored_message&.destroy raise e end # rubocop:enable Metrics/AbcSize def exchange @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {}) end private def channel RailwayIpc::ConnectionManager.instance.channel end def log_message_options(message) { feature: 'railway_ipc_publisher', exchange: exchange_name, protobuf: { type: message.class, data: message } } end end end