lib/fluffle/server.rb in fluffle-0.9.0 vs lib/fluffle/server.rb in fluffle-0.9.1
- old
+ new
@@ -20,10 +20,11 @@
@publish_timeout = 5
@shutdown_timeout = 15
@handlers = {}
@handler_pool = Concurrent::FixedThreadPool.new concurrency
+ @consumers = []
self.class.default_server ||= self
end
class << self
@@ -64,11 +65,11 @@
@handlers.each do |name, handler|
qualified_name = Fluffle.request_queue_name name
queue = @channel.queue qualified_name
- queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
+ consumer = queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
@handler_pool.post do
begin
handle_request handler: handler,
properties: properties,
payload: payload
@@ -78,10 +79,12 @@
ensure
@channel.ack delivery_info.delivery_tag
end
end
end
+
+ @consumers << consumer
end
self.wait_for_signal
end
@@ -109,17 +112,22 @@
readables = io.first
signal = readables.first.gets.strip
Fluffle.logger.info "Received #{signal}; shutting down..."
- @channel.close
+ # First stop the consumers from receiving messages
+ @consumers.each &:cancel
+ # Then wait for worker pools to finish processing their active jobs
@handler_pool.shutdown
unless @handler_pool.wait_for_termination(@shutdown_timeout)
# `wait_for_termination` returns false if it didn't shut down in time,
# so we need to kill it
@handler_pool.kill
end
+
+ # Finally close the connection
+ @channel.close
return
end
end