lib/fluffle/server.rb in fluffle-0.7.2 vs lib/fluffle/server.rb in fluffle-0.8.0

- old
+ new

@@ -4,22 +4,23 @@ module Fluffle class Server include Connectable attr_reader :confirms, :connection, :handlers, :handler_pool, :mandatory - attr_accessor :publish_timeout + attr_accessor :publish_timeout, :shutdown_timeout # url: - Optional URL to pass to `Bunny.new` to immediately connect # concurrency: - Number of threads to handle messages on (default: 1) # confirms: - Whether or not to use RabbitMQ confirms def initialize(url: nil, connection: nil, concurrency: 1, confirms: false, mandatory: false) url_or_connection = url || connection self.connect(url_or_connection) if url_or_connection - @confirms = confirms - @mandatory = mandatory - @publish_timeout = 5 + @confirms = confirms + @mandatory = mandatory + @publish_timeout = 5 + @shutdown_timeout = 15 @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency self.class.default_server ||= self @@ -66,18 +67,18 @@ queue = @channel.queue qualified_name queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| @handler_pool.post do begin - @channel.ack delivery_info.delivery_tag - handle_request handler: handler, properties: properties, payload: payload rescue => err # Ensure we don't loose any errors on the handler pool's thread Fluffle.logger.error "[Fluffle::Server] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}" + ensure + @channel.ack delivery_info.delivery_tag end end end end @@ -107,10 +108,18 @@ while io = IO.select([signal_read]) readables = io.first signal = readables.first.gets.strip Fluffle.logger.info "Received #{signal}; shutting down..." + @channel.work_pool.shutdown + + @handler_pool.shutdown + unless @handler_pool.wait_for_termination(@shutdown_timeout) + # `wait_for_termination` returns false if it didn't shut down in time, + # so we need to kill it + @handler_pool.kill + end return end end