lib/fluffle/server.rb in fluffle-0.9.0 vs lib/fluffle/server.rb in fluffle-0.9.1

- old
+ new

@@ -20,10 +20,11 @@ @publish_timeout = 5 @shutdown_timeout = 15 @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency + @consumers = [] self.class.default_server ||= self end class << self @@ -64,11 +65,11 @@ @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name - queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| + consumer = queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| @handler_pool.post do begin handle_request handler: handler, properties: properties, payload: payload @@ -78,10 +79,12 @@ ensure @channel.ack delivery_info.delivery_tag end end end + + @consumers << consumer end self.wait_for_signal end @@ -109,17 +112,22 @@ readables = io.first signal = readables.first.gets.strip Fluffle.logger.info "Received #{signal}; shutting down..." - @channel.close + # First stop the consumers from receiving messages + @consumers.each &:cancel + # Then wait for worker pools to finish processing their active jobs @handler_pool.shutdown unless @handler_pool.wait_for_termination(@shutdown_timeout) # `wait_for_termination` returns false if it didn't shut down in time, # so we need to kill it @handler_pool.kill end + + # Finally close the connection + @channel.close return end end