lib/griffin/thread_pool.rb in griffin-0.1.9 vs lib/griffin/thread_pool.rb in griffin-0.2.0
- old
+ new
@@ -1,27 +1,29 @@
# frozen_string_literal: true
-require 'griffin/counting_semaphore'
+require 'grpc_kit/thread_pool/auto_trimmer'
module Griffin
class ThreadPool
- DEFAULT_POOL_SIZE = 20
- DEFAULT_QUEUE_SIZE = 512
+ DEFAULT_MAX = 5
+ DEFAULT_MIN = 1
+ QUEUE_SIZE = 128
- def initialize(pool_size = DEFAULT_POOL_SIZE, queue_size: DEFAULT_QUEUE_SIZE, &block)
- @pool_size = pool_size
- @queue_size = queue_size
+ def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block)
+ @max_pool_size = max
+ @min_pool_size = min
@block = block
@shutdown = false
- @semaphore = Griffin::CountingSemaphore.new(queue_size)
- @tasks = Queue.new
+ @tasks = SizedQueue.new(QUEUE_SIZE)
@spawned = 0
@workers = []
@mutex = Mutex.new
+ @waiting = 0
- @pool_size.times { spawn_thread }
+ @min_pool_size.times { spawn_thread }
+ @auto_trimmer = GrpcKit::ThreadPool::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!)
end
def schedule(task, &block)
if task.nil?
return
@@ -30,29 +32,35 @@
if @shutdown
raise "scheduling new task isn't allowed during shutdown"
end
# TODO: blocking now..
- @semaphore.wait
@tasks.push(block || task)
- @mutex.synchronize do
- if @spawned < @pool_size
- spawn_thread
- end
+ if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) }
+ spawn_thread
end
end
def shutdown
@shutdown = true
- @pool_size.times { @tasks.push(nil) }
+ @max_pool_size.times { @tasks.push(nil) }
+ @auto_trimmer.stop
until @workers.empty?
- Griffin.logger.debug("#{@pool_size - @spawned} worker thread(s) shutdowned, waiting #{@spawned}")
+ Griffin.logger.debug("Shutdown waiting #{@waiting} workers")
sleep 1
end
end
+ # For GrpcKit::ThreadPool::AutoTrimmer
+ def trim(force = false)
+ if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) }
+ GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}")
+ @tasks.push(nil)
+ end
+ end
+
private
def spawn_thread
@spawned += 1
worker = Thread.new(@spawned) do |i|
@@ -62,20 +70,20 @@
loop do
if @shutdown
break
end
+ @mutex.synchronize { @waiting += 1 }
task = @tasks.pop
+ @mutex.synchronize { @waiting -= 1 }
if task.nil?
break
end
begin
@block.call(task)
rescue Exception => e # rubocop:disable Lint/RescueException
Griffin.logger.error("An error occured on top level in worker #{Thread.current.name}: #{e.message} (#{e.class})\n #{Thread.current.backtrace.join("\n")} ")
- ensure
- @semaphore.signal
end
end
Griffin.logger.debug("worker thread #{Thread.current.name} is stopping")
@mutex.synchronize do