lib/advanced_sneakers_activejob/publisher.rb in advanced-sneakers-activejob-0.2.3 vs lib/advanced_sneakers_activejob/publisher.rb in advanced-sneakers-activejob-0.3.0
- old
+ new
@@ -1,210 +1,33 @@
# frozen_string_literal: true
module AdvancedSneakersActiveJob
- # Based on Sneakers::Publisher, but refactored to utilize :mandatory option to handle unrouted messages
- # http://rubybunny.info/articles/exchanges.html#publishing_messages_as_mandatory
- class Publisher
- WAIT_FOR_UNROUTED_MESSAGES_AT_EXIT_TIMEOUT = 30
+ class Publisher < ::BunnyPublisher::Base
+ include ::BunnyPublisher::Mandatory
- delegate :sneakers, :handle_unrouted_messages, :delayed_queue_prefix,
- to: :'AdvancedSneakersActiveJob.config', prefix: :config
+ before_publish :log_message
- delegate :logger, to: :'ActiveJob::Base'
+ delegate :logger, to: :'::ActiveJob::Base'
- attr_reader :publish_channel, :republish_channel,
- :publish_exchange, :republish_exchange,
- :publish_delayed_exchange, :republish_delayed_exchange
+ delegate :handle_unrouted_messages,
+ to: :'AdvancedSneakersActiveJob.config',
+ prefix: :config
- def initialize
- @mutex = Mutex.new
- at_exit { wait_for_unrouted_messages_processing(timeout: WAIT_FOR_UNROUTED_MESSAGES_AT_EXIT_TIMEOUT) }
- end
-
- def publish(message, routing_key: nil, headers: {}, **properties)
- ensure_connection!
-
- logger.debug "Publishing <#{message}> to [#{publish_exchange.name}] with routing_key [#{routing_key}]"
-
- params = properties.deep_symbolize_keys.merge(
- routing_key: routing_key,
- mandatory: true,
- content_type: AdvancedSneakersActiveJob::CONTENT_TYPE,
- headers: headers
- )
-
- publish_exchange.publish(message, params)
- end
-
- def publish_delayed(message, delay:, routing_key: nil, headers: {}, **properties)
- ensure_connection!
-
- logger.debug "Publishing <#{message}> to [#{publish_delayed_exchange.name}] with routing_key [#{routing_key}] and delay [#{delay}]"
-
- params = properties.deep_symbolize_keys.merge(
- routing_key: routing_key,
- mandatory: true,
- content_type: AdvancedSneakersActiveJob::CONTENT_TYPE,
- headers: headers.deep_symbolize_keys.merge(delay: delay.to_i) # do not use x- prefix because headers exchanges ignore such headers
- )
-
- publish_delayed_exchange.publish(message, params)
- end
-
private
- def ensure_connection!
- @mutex.synchronize do
- unless connected?
- start_connections!
- create_channels!
- configure_exchanges!
- end
+ def log_message(publisher, message, options = {})
+ logger.debug do
+ "Publishing <#{message}> to [#{publisher.exchange.name}] with routing_key [#{options[:routing_key]}]"
end
end
- def start_connections!
- @publish_connection ||= create_bunny_connection
- @publish_connection.start
-
- @republish_connection ||= create_bunny_connection
- @republish_connection.start
- end
-
- def create_channels!
- @publish_channel = @publish_connection.create_channel
- @republish_channel = @republish_connection.create_channel
- end
-
- def configure_exchanges!
- @publish_exchange = build_exchange(@publish_channel)
- @publish_exchange.on_return { |*attrs| handle_unrouted_messages(*attrs) }
-
- @publish_delayed_exchange = build_delayed_exchange(@publish_channel)
- @publish_delayed_exchange.on_return { |*attrs| handle_unrouted_delayed_messages(*attrs) }
-
- @republish_exchange = build_exchange(republish_channel)
- @republish_delayed_exchange = build_delayed_exchange(republish_channel)
- end
-
- def connected?
- @publish_connection&.connected? &&
- @republish_connection&.connected? &&
- @publish_channel &&
- @republish_channel
- end
-
- # Returned messages are processed asynchronously and there is a probability for messages loses on program exit or network failure.
- # Second connection is required because `on_return` is called within a frameset of amqp connection.
- # Any interaction within the connection (even by another channel) can lead to connection error.
- # https://github.com/ruby-amqp/bunny/blob/7fb05abf36637557f75a69790be78f9cc1cea807/lib/bunny/session.rb#L683
- def handle_unrouted_messages(return_info, properties, message)
- @unrouted_message = true
-
- params = { message: message, return_info: return_info, properties: properties }
-
- raise(PublishError, params) if return_info.reply_code != 312 # NO_ROUTE
-
+ def on_message_return(return_info, properties, message)
if config_handle_unrouted_messages
- setup_routing_and_republish_message(params)
+ super
else
- logger.warn("Message is not routed! #{params}")
+ logger.warn do
+ "Message is not routed! #{{ message: message, return_info: return_info, properties: properties }}"
+ end
end
-
- @unrouted_message = false
- end
-
- def handle_unrouted_delayed_messages(return_info, properties, message)
- @unrouted_delayed_message = true
-
- params = { message: message, return_info: return_info, properties: properties }
-
- raise(PublishError, params) if return_info.reply_code != 312 # NO_ROUTE
-
- setup_routing_and_republish_delayed_message(params)
-
- @unrouted_delayed_message = false
- end
-
- # TODO: introduce more reliable way to wait for handling of unrouted messages at exit
- def wait_for_unrouted_messages_processing(timeout:)
- sleep(0.05) # gives publish_exchange some time to receive retuned message
-
- return unless @unrouted_message || @unrouted_delayed_message
-
- logger.warn("Waiting up to #{timeout} seconds for unrouted messages handling")
-
- Timeout.timeout(timeout) { sleep 0.01 while @unrouted_message || @unrouted_delayed_message }
- rescue Timeout::Error
- logger.warn('Some unrouted messages are lost on process exit!')
- end
-
- def setup_routing_and_republish_message(message:, return_info:, properties:)
- logger.debug("Performing queue/binding setup & re-publish for unrouted message. #{{ message: message, return_info: return_info }}")
-
- routing_key = return_info.routing_key
-
- create_queue_and_binding(queue_name: deserialize(message).fetch('queue_name'), routing_key: routing_key)
-
- logger.debug "Re-publishing <#{message}> to [#{republish_exchange.name}] with routing_key [#{routing_key}]"
- republish_exchange.publish(message, properties.to_h.merge(routing_key: routing_key))
- end
-
- def create_queue_and_binding(queue_name:, routing_key:)
- logger.debug "Creating queue [#{queue_name}] and binding with routing_key [#{routing_key}] to [#{republish_exchange.name}]"
- republish_channel.queue(queue_name, config_sneakers[:queue_options]).tap do |queue|
- queue.bind(republish_exchange, routing_key: routing_key)
- republish_channel.deregister_queue(queue) # we are not going to work with this queue in this channel
- end
- end
-
- def setup_routing_and_republish_delayed_message(message:, return_info:, properties:)
- delay = properties.headers.fetch('delay').to_i
- queue_name = delayed_queue_name(delay: delay)
-
- logger.debug "Creating delayed queue [#{queue_name}]"
-
- create_delayed_queue_and_binding(queue_name: queue_name, delay: delay)
-
- republish_delayed_exchange.publish message, properties.to_h.merge(routing_key: return_info.routing_key)
- end
-
- def delayed_queue_name(delay:)
- [
- ::ActiveJob::Base.queue_name_prefix,
- [config_delayed_queue_prefix, delay].join(':')
- ].compact.join(::ActiveJob::Base.queue_name_delimiter)
- end
-
- def create_delayed_queue_and_binding(queue_name:, delay:)
- queue_arguments = {
- 'x-queue-mode' => 'lazy', # tell RabbitMQ not to use RAM for this queue as it won't be consumed
- 'x-message-ttl' => delay * 1000, # make messages die after requested time
- 'x-dead-letter-exchange' => republish_exchange.name # died messages go to original exchange and then routed to consumers
- }
-
- republish_channel.queue(queue_name, durable: true, arguments: queue_arguments).tap do |queue|
- queue.bind(republish_delayed_exchange, arguments: { delay: delay })
- republish_channel.deregister_queue(queue) # we are not going to work with this queue in this channel
- end
- end
-
- def build_exchange(channel)
- channel.exchange(config_sneakers[:exchange], config_sneakers[:exchange_options])
- end
-
- def build_delayed_exchange(channel)
- channel.exchange([config_sneakers[:exchange], 'delayed'].join('-'), type: 'headers', durable: true)
- end
-
- def create_bunny_connection
- Bunny.new config_sneakers[:amqp],
- vhost: config_sneakers[:vhost],
- heartbeat: config_sneakers[:heartbeat],
- properties: config_sneakers.fetch(:properties, {})
- end
-
- def deserialize(message)
- Sneakers::ContentType.deserialize(message, AdvancedSneakersActiveJob::CONTENT_TYPE)
end
end
end