lib/dat-tcp/worker_pool.rb in dat-tcp-0.2.0 vs lib/dat-tcp/worker_pool.rb in dat-tcp-0.3.0
- old
+ new
@@ -1,18 +1,22 @@
+require 'system_timer'
require 'thread'
require 'dat-tcp/logger'
module DatTCP
+ TimeoutError = Class.new(RuntimeError)
+
class WorkerPool
attr_reader :logger, :spawned
def initialize(min = 0, max = 1, debug = false, &serve_proc)
@min_workers = min
@max_workers = max
- @logger = DatTCP::Logger.new(debug)
+ @debug = debug
+ @logger = DatTCP::Logger.new(@debug)
@serve_proc = serve_proc
@queue = DatTCP::Queue.new
@workers_waiting = DatTCP::WorkersWaiting.new
@@ -46,19 +50,26 @@
# Shutdown each worker and then the queue. Shutting down the queue will
# signal any workers waiting on it to wake up, so they can start shutting
# down. If a worker is processing a connection, then it will be joined and
# allowed to finish.
# **NOTE** Any connections that are on the queue are not served.
- def shutdown
- @workers.each(&:shutdown)
- @queue.shutdown
+ def shutdown(timeout)
+ begin
+ SystemTimer.timeout(timeout, DatTCP::TimeoutError) do
+ @workers.each(&:shutdown)
+ @queue.shutdown
- # use this pattern instead of `each` -- we don't want to call `join` on
- # every worker (especially if they are shutting down on their own), we
- # just want to make sure that any who haven't had a chance to finish
- # get to (this is safe, otherwise you might get a dead thread in the
- # `each`).
- @workers.first.join until @workers.empty?
+ # use this pattern instead of `each` -- we don't want to call `join` on
+ # every worker (especially if they are shutting down on their own), we
+ # just want to make sure that any who haven't had a chance to finish
+ # get to (this is safe, otherwise you might get a dead thread in the
+ # `each`).
+ @workers.first.join until @workers.empty?
+ end
+ rescue DatTCP::TimeoutError => exception
+ exception.message.replace "Timed out shutting down the server"
+ @debug ? raise(exception) : self.logger.error(exception.message)
+ end
end
# public, because workers need to call it for themselves
def despawn_worker(worker)
@mutex.synchronize do