lib/fluffle/server.rb in fluffle-0.7.2 vs lib/fluffle/server.rb in fluffle-0.8.0
- old
+ new
@@ -4,22 +4,23 @@
module Fluffle
class Server
include Connectable
attr_reader :confirms, :connection, :handlers, :handler_pool, :mandatory
- attr_accessor :publish_timeout
+ attr_accessor :publish_timeout, :shutdown_timeout
# url: - Optional URL to pass to `Bunny.new` to immediately connect
# concurrency: - Number of threads to handle messages on (default: 1)
# confirms: - Whether or not to use RabbitMQ confirms
def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false)
url_or_connection = url || connection
self.connect(url_or_connection) if url_or_connection
- @confirms = confirms
- @mandatory = mandatory
- @publish_timeout = 5
+ @confirms = confirms
+ @mandatory = mandatory
+ @publish_timeout = 5
+ @shutdown_timeout = 15
@handlers = {}
@handler_pool = Concurrent::FixedThreadPool.new concurrency
self.class.default_server ||= self
@@ -66,18 +67,18 @@
queue = @channel.queue qualified_name
queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
@handler_pool.post do
begin
- @channel.ack delivery_info.delivery_tag
-
handle_request handler: handler,
properties: properties,
payload: payload
rescue => err
# Ensure we don't loose any errors on the handler pool's thread
Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
+ ensure
+ @channel.ack delivery_info.delivery_tag
end
end
end
end
@@ -107,10 +108,18 @@
while io = IO.select([signal_read])
readables = io.first
signal = readables.first.gets.strip
Fluffle.logger.info "Received #{signal}; shutting down..."
+
@channel.work_pool.shutdown
+
+ @handler_pool.shutdown
+ unless @handler_pool.wait_for_termination(@shutdown_timeout)
+ # `wait_for_termination` returns false if it didn't shut down in time,
+ # so we need to kill it
+ @handler_pool.kill
+ end
return
end
end