lib/fluffle/server.rb in fluffle-0.7.0 vs lib/fluffle/server.rb in fluffle-0.7.1

- old
+ new

@@ -3,21 +3,22 @@ module Fluffle class Server include Connectable - attr_reader :confirms, :connection, :handlers, :handler_pool + attr_reader :confirms, :connection, :handlers, :handler_pool, :mandatory attr_accessor :publish_timeout # url: - Optional URL to pass to `Bunny.new` to immediately connect # concurrency: - Number of threads to handle messages on (default: 1) # confirms: - Whether or not to use RabbitMQ confirms - def initialize(url: nil, connection: nil, concurrency: 1, confirms: false) + def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) url_or_connection = url || connection self.connect(url_or_connection) if url_or_connection @confirms = confirms + @mandatory = mandatory @publish_timeout = 5 @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency @@ -44,15 +45,22 @@ @handlers.freeze @channel = @connection.create_channel @exchange = @channel.default_exchange + # Ensure we only receive 1 message at a time for each consumer + @channel.prefetch 1 + if confirms @confirmer = Fluffle::Confirmer.new channel: @channel @confirmer.confirm_select end + if mandatory + handle_returns + end + raise 'No handlers defined' if @handlers.empty? @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name @@ -72,9 +80,16 @@ end end end self.wait_for_signal + end + + def handle_returns + @exchange.on_return do |return_info, _properties, _payload| + message = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text + Fluffle.logger.error "[Fluffle::Server] #{message}" + end end # NOTE: Keeping this in its own method so its functionality can be more # easily overwritten by `Fluffle::Testing`. def wait_for_signal