Sha256: 737a0045c72e8bbba4d132c4c2b66f5c7a141579669fdc49ea1697cbd406751f
Contents?: true
Size: 1.56 KB
Versions: 2
Compression:
Stored size: 1.56 KB
Contents
# frozen_string_literal: true module RailwayIpc class Consumer include Sneakers::Worker def self.inherited(base) super base.instance_eval do def handlers @handlers ||= RailwayIpc::HandlerStore.new end end end def self.listen_to(queue:, exchange:, options: {}) unless options.empty? RailwayIpc.logger.info( "Overriding configuration for #{queue} with new options", feature: 'railway_ipc_consumer', options: options ) end from_queue queue, { exchange: exchange, durable: true, exchange_type: :fanout, connection: RailwayIpc.bunny_connection }.merge(options) end def self.handle(message_type, with:) handlers.register(message: message_type, handler: with) end def handlers self.class.handlers end def registered_handlers handlers.registered end def queue_name queue.name end def exchange_name queue.opts[:exchange] end def work(payload) message = RailwayIpc::IncomingMessage.new(payload) RailwayIpc::ProcessIncomingMessage.call(self, message) ack! rescue StandardError => e RailwayIpc.logger.error( e.message, feature: 'railway_ipc_consumer', exchange: exchange_name, queue: queue_name, error: e.class, payload: payload ) raise e end def get_handler(type) manifest = handlers.get(type) manifest ? manifest.handler.new : nil end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
railway-ipc-2.2.2 | lib/railway_ipc/consumer/consumer.rb |
railway-ipc-2.2.1 | lib/railway_ipc/consumer/consumer.rb |