lib/bixby-common/util/thread_pool.rb in bixby-common-0.6.2 vs lib/bixby-common/util/thread_pool.rb in bixby-common-0.6.3
- old
+ new
@@ -30,11 +30,11 @@
def enqueue(command, block=nil)
logger.debug { "enqueue new task: #{command}" }
@input_queue.push(Task.new(command, block))
if command == :perform then
- grow_pool
+ grow
end
nil
end
def perform(&block)
@@ -123,11 +123,10 @@
remove_worker(worker)
block.call if block
end
enqueue(:shutdown, callback)
- @size -= 1
end
end
nil
end
@@ -163,12 +162,12 @@
logger.debug "spawning new worker thread"
exit_handler = lambda { |worker, reason|
@lock.synchronize do
if reason == :exception or (reason == :timeout && @size > @min_size) then
- @size -= 1
remove_worker(worker)
+ grow
return true
end
false
end
}
@@ -177,28 +176,40 @@
@size += 1
end
end
# Grow the pool by one if we have more jobs than idle workers
- def grow_pool
+ def grow
@lock.synchronize do
+ prune
logger.debug { "jobs: #{num_jobs}; busy: #{num_working}; idle: #{num_idle}" }
- if @size < @max_size && num_jobs > 0 && num_jobs > num_idle then
+ if @size == 0 || (@size < @max_size && num_jobs > 0 && num_jobs > num_idle) then
space = @max_size-@size
jobs = num_jobs-num_idle
needed = space < jobs ? space : jobs
+ needed = 1 if needed <= 0
expand(needed)
else
logger.debug "NOT growing the pool!"
end
end
nil
end
+ # Remove any dead worker threads which may not have been cleaned up properly
+ # (via callback handler)
+ def prune
+ @lock.synchronize do
+ @workers.delete_if { |w| !w.alive? }
+ @size = @workers.size
+ end
+ end
+
def remove_worker(worker)
@lock.synchronize do
@workers.delete(worker)
+ @size -= 1
end
nil
end
end