lib/advanced_sneakers_activejob/publisher.rb in advanced-sneakers-activejob-0.2.3 vs lib/advanced_sneakers_activejob/publisher.rb in advanced-sneakers-activejob-0.3.0

- old
+ new

@@ -1,210 +1,33 @@ # frozen_string_literal: true module AdvancedSneakersActiveJob - # Based on Sneakers::Publisher, but refactored to utilize :mandatory option to handle unrouted messages - # http://rubybunny.info/articles/exchanges.html#publishing_messages_as_mandatory - class Publisher - WAIT_FOR_UNROUTED_MESSAGES_AT_EXIT_TIMEOUT = 30 + class Publisher < ::BunnyPublisher::Base + include ::BunnyPublisher::Mandatory - delegate :sneakers, :handle_unrouted_messages, :delayed_queue_prefix, - to: :'AdvancedSneakersActiveJob.config', prefix: :config + before_publish :log_message - delegate :logger, to: :'ActiveJob::Base' + delegate :logger, to: :'::ActiveJob::Base' - attr_reader :publish_channel, :republish_channel, - :publish_exchange, :republish_exchange, - :publish_delayed_exchange, :republish_delayed_exchange + delegate :handle_unrouted_messages, + to: :'AdvancedSneakersActiveJob.config', + prefix: :config - def initialize - @mutex = Mutex.new - at_exit { wait_for_unrouted_messages_processing(timeout: WAIT_FOR_UNROUTED_MESSAGES_AT_EXIT_TIMEOUT) } - end - - def publish(message, routing_key: nil, headers: {}, **properties) - ensure_connection! - - logger.debug "Publishing <#{message}> to [#{publish_exchange.name}] with routing_key [#{routing_key}]" - - params = properties.deep_symbolize_keys.merge( - routing_key: routing_key, - mandatory: true, - content_type: AdvancedSneakersActiveJob::CONTENT_TYPE, - headers: headers - ) - - publish_exchange.publish(message, params) - end - - def publish_delayed(message, delay:, routing_key: nil, headers: {}, **properties) - ensure_connection! - - logger.debug "Publishing <#{message}> to [#{publish_delayed_exchange.name}] with routing_key [#{routing_key}] and delay [#{delay}]" - - params = properties.deep_symbolize_keys.merge( - routing_key: routing_key, - mandatory: true, - content_type: AdvancedSneakersActiveJob::CONTENT_TYPE, - headers: headers.deep_symbolize_keys.merge(delay: delay.to_i) # do not use x- prefix because headers exchanges ignore such headers - ) - - publish_delayed_exchange.publish(message, params) - end - private - def ensure_connection! - @mutex.synchronize do - unless connected? - start_connections! - create_channels! - configure_exchanges! - end + def log_message(publisher, message, options = {}) + logger.debug do + "Publishing <#{message}> to [#{publisher.exchange.name}] with routing_key [#{options[:routing_key]}]" end end - def start_connections! - @publish_connection ||= create_bunny_connection - @publish_connection.start - - @republish_connection ||= create_bunny_connection - @republish_connection.start - end - - def create_channels! - @publish_channel = @publish_connection.create_channel - @republish_channel = @republish_connection.create_channel - end - - def configure_exchanges! - @publish_exchange = build_exchange(@publish_channel) - @publish_exchange.on_return { |*attrs| handle_unrouted_messages(*attrs) } - - @publish_delayed_exchange = build_delayed_exchange(@publish_channel) - @publish_delayed_exchange.on_return { |*attrs| handle_unrouted_delayed_messages(*attrs) } - - @republish_exchange = build_exchange(republish_channel) - @republish_delayed_exchange = build_delayed_exchange(republish_channel) - end - - def connected? - @publish_connection&.connected? && - @republish_connection&.connected? && - @publish_channel && - @republish_channel - end - - # Returned messages are processed asynchronously and there is a probability for messages loses on program exit or network failure. - # Second connection is required because `on_return` is called within a frameset of amqp connection. - # Any interaction within the connection (even by another channel) can lead to connection error. - # https://github.com/ruby-amqp/bunny/blob/7fb05abf36637557f75a69790be78f9cc1cea807/lib/bunny/session.rb#L683 - def handle_unrouted_messages(return_info, properties, message) - @unrouted_message = true - - params = { message: message, return_info: return_info, properties: properties } - - raise(PublishError, params) if return_info.reply_code != 312 # NO_ROUTE - + def on_message_return(return_info, properties, message) if config_handle_unrouted_messages - setup_routing_and_republish_message(params) + super else - logger.warn("Message is not routed! #{params}") + logger.warn do + "Message is not routed! #{{ message: message, return_info: return_info, properties: properties }}" + end end - - @unrouted_message = false - end - - def handle_unrouted_delayed_messages(return_info, properties, message) - @unrouted_delayed_message = true - - params = { message: message, return_info: return_info, properties: properties } - - raise(PublishError, params) if return_info.reply_code != 312 # NO_ROUTE - - setup_routing_and_republish_delayed_message(params) - - @unrouted_delayed_message = false - end - - # TODO: introduce more reliable way to wait for handling of unrouted messages at exit - def wait_for_unrouted_messages_processing(timeout:) - sleep(0.05) # gives publish_exchange some time to receive retuned message - - return unless @unrouted_message || @unrouted_delayed_message - - logger.warn("Waiting up to #{timeout} seconds for unrouted messages handling") - - Timeout.timeout(timeout) { sleep 0.01 while @unrouted_message || @unrouted_delayed_message } - rescue Timeout::Error - logger.warn('Some unrouted messages are lost on process exit!') - end - - def setup_routing_and_republish_message(message:, return_info:, properties:) - logger.debug("Performing queue/binding setup & re-publish for unrouted message. #{{ message: message, return_info: return_info }}") - - routing_key = return_info.routing_key - - create_queue_and_binding(queue_name: deserialize(message).fetch('queue_name'), routing_key: routing_key) - - logger.debug "Re-publishing <#{message}> to [#{republish_exchange.name}] with routing_key [#{routing_key}]" - republish_exchange.publish(message, properties.to_h.merge(routing_key: routing_key)) - end - - def create_queue_and_binding(queue_name:, routing_key:) - logger.debug "Creating queue [#{queue_name}] and binding with routing_key [#{routing_key}] to [#{republish_exchange.name}]" - republish_channel.queue(queue_name, config_sneakers[:queue_options]).tap do |queue| - queue.bind(republish_exchange, routing_key: routing_key) - republish_channel.deregister_queue(queue) # we are not going to work with this queue in this channel - end - end - - def setup_routing_and_republish_delayed_message(message:, return_info:, properties:) - delay = properties.headers.fetch('delay').to_i - queue_name = delayed_queue_name(delay: delay) - - logger.debug "Creating delayed queue [#{queue_name}]" - - create_delayed_queue_and_binding(queue_name: queue_name, delay: delay) - - republish_delayed_exchange.publish message, properties.to_h.merge(routing_key: return_info.routing_key) - end - - def delayed_queue_name(delay:) - [ - ::ActiveJob::Base.queue_name_prefix, - [config_delayed_queue_prefix, delay].join(':') - ].compact.join(::ActiveJob::Base.queue_name_delimiter) - end - - def create_delayed_queue_and_binding(queue_name:, delay:) - queue_arguments = { - 'x-queue-mode' => 'lazy', # tell RabbitMQ not to use RAM for this queue as it won't be consumed - 'x-message-ttl' => delay * 1000, # make messages die after requested time - 'x-dead-letter-exchange' => republish_exchange.name # died messages go to original exchange and then routed to consumers - } - - republish_channel.queue(queue_name, durable: true, arguments: queue_arguments).tap do |queue| - queue.bind(republish_delayed_exchange, arguments: { delay: delay }) - republish_channel.deregister_queue(queue) # we are not going to work with this queue in this channel - end - end - - def build_exchange(channel) - channel.exchange(config_sneakers[:exchange], config_sneakers[:exchange_options]) - end - - def build_delayed_exchange(channel) - channel.exchange([config_sneakers[:exchange], 'delayed'].join('-'), type: 'headers', durable: true) - end - - def create_bunny_connection - Bunny.new config_sneakers[:amqp], - vhost: config_sneakers[:vhost], - heartbeat: config_sneakers[:heartbeat], - properties: config_sneakers.fetch(:properties, {}) - end - - def deserialize(message) - Sneakers::ContentType.deserialize(message, AdvancedSneakersActiveJob::CONTENT_TYPE) end end end