lib/fluffle/server.rb in fluffle-0.7.0 vs lib/fluffle/server.rb in fluffle-0.7.1
- old
+ new
@@ -3,21 +3,22 @@
module Fluffle
class Server
include Connectable
- attr_reader :confirms, :connection, :handlers, :handler_pool
+ attr_reader :confirms, :connection, :handlers, :handler_pool, :mandatory
attr_accessor :publish_timeout
# url: - Optional URL to pass to `Bunny.new` to immediately connect
# concurrency: - Number of threads to handle messages on (default: 1)
# confirms: - Whether or not to use RabbitMQ confirms
- def initialize(url: nil, connection: nil, concurrency: 1, confirms: false)
+ def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false)
url_or_connection = url || connection
self.connect(url_or_connection) if url_or_connection
@confirms = confirms
+ @mandatory = mandatory
@publish_timeout = 5
@handlers = {}
@handler_pool = Concurrent::FixedThreadPool.new concurrency
@@ -44,15 +45,22 @@
@handlers.freeze
@channel = @connection.create_channel
@exchange = @channel.default_exchange
+ # Ensure we only receive 1 message at a time for each consumer
+ @channel.prefetch 1
+
if confirms
@confirmer = Fluffle::Confirmer.new channel: @channel
@confirmer.confirm_select
end
+ if mandatory
+ handle_returns
+ end
+
raise 'No handlers defined' if @handlers.empty?
@handlers.each do |name, handler|
qualified_name = Fluffle.request_queue_name name
queue = @channel.queue qualified_name
@@ -72,9 +80,16 @@
end
end
end
self.wait_for_signal
+ end
+
+ def handle_returns
+ @exchange.on_return do |return_info, _properties, _payload|
+ message = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text
+ Fluffle.logger.error "[Fluffle::Server] #{message}"
+ end
end
# NOTE: Keeping this in its own method so its functionality can be more
# easily overwritten by `Fluffle::Testing`.
def wait_for_signal