lib/bixby-common/util/thread_pool.rb in bixby-common-0.6.2 vs lib/bixby-common/util/thread_pool.rb in bixby-common-0.6.3

- old
+ new

@@ -30,11 +30,11 @@ def enqueue(command, block=nil) logger.debug { "enqueue new task: #{command}" } @input_queue.push(Task.new(command, block)) if command == :perform then - grow_pool + grow end nil end def perform(&block) @@ -123,11 +123,10 @@ remove_worker(worker) block.call if block end enqueue(:shutdown, callback) - @size -= 1 end end nil end @@ -163,12 +162,12 @@ logger.debug "spawning new worker thread" exit_handler = lambda { |worker, reason| @lock.synchronize do if reason == :exception or (reason == :timeout && @size > @min_size) then - @size -= 1 remove_worker(worker) + grow return true end false end } @@ -177,28 +176,40 @@ @size += 1 end end # Grow the pool by one if we have more jobs than idle workers - def grow_pool + def grow @lock.synchronize do + prune logger.debug { "jobs: #{num_jobs}; busy: #{num_working}; idle: #{num_idle}" } - if @size < @max_size && num_jobs > 0 && num_jobs > num_idle then + if @size == 0 || (@size < @max_size && num_jobs > 0 && num_jobs > num_idle) then space = @max_size-@size jobs = num_jobs-num_idle needed = space < jobs ? space : jobs + needed = 1 if needed <= 0 expand(needed) else logger.debug "NOT growing the pool!" end end nil end + # Remove any dead worker threads which may not have been cleaned up properly + # (via callback handler) + def prune + @lock.synchronize do + @workers.delete_if { |w| !w.alive? } + @size = @workers.size + end + end + def remove_worker(worker) @lock.synchronize do @workers.delete(worker) + @size -= 1 end nil end end