lib/dat-tcp/worker_pool.rb in dat-tcp-0.2.0 vs lib/dat-tcp/worker_pool.rb in dat-tcp-0.3.0

- old
+ new

@@ -1,18 +1,22 @@ +require 'system_timer' require 'thread' require 'dat-tcp/logger' module DatTCP + TimeoutError = Class.new(RuntimeError) + class WorkerPool attr_reader :logger, :spawned def initialize(min = 0, max = 1, debug = false, &serve_proc) @min_workers = min @max_workers = max - @logger = DatTCP::Logger.new(debug) + @debug = debug + @logger = DatTCP::Logger.new(@debug) @serve_proc = serve_proc @queue = DatTCP::Queue.new @workers_waiting = DatTCP::WorkersWaiting.new @@ -46,19 +50,26 @@ # Shutdown each worker and then the queue. Shutting down the queue will # signal any workers waiting on it to wake up, so they can start shutting # down. If a worker is processing a connection, then it will be joined and # allowed to finish. # **NOTE** Any connections that are on the queue are not served. - def shutdown - @workers.each(&:shutdown) - @queue.shutdown + def shutdown(timeout) + begin + SystemTimer.timeout(timeout, DatTCP::TimeoutError) do + @workers.each(&:shutdown) + @queue.shutdown - # use this pattern instead of `each` -- we don't want to call `join` on - # every worker (especially if they are shutting down on their own), we - # just want to make sure that any who haven't had a chance to finish - # get to (this is safe, otherwise you might get a dead thread in the - # `each`). - @workers.first.join until @workers.empty? + # use this pattern instead of `each` -- we don't want to call `join` on + # every worker (especially if they are shutting down on their own), we + # just want to make sure that any who haven't had a chance to finish + # get to (this is safe, otherwise you might get a dead thread in the + # `each`). + @workers.first.join until @workers.empty? + end + rescue DatTCP::TimeoutError => exception + exception.message.replace "Timed out shutting down the server" + @debug ? raise(exception) : self.logger.error(exception.message) + end end # public, because workers need to call it for themselves def despawn_worker(worker) @mutex.synchronize do