lib/griffin/thread_pool.rb in griffin-0.1.9 vs lib/griffin/thread_pool.rb in griffin-0.2.0

- old
+ new

@@ -1,27 +1,29 @@ # frozen_string_literal: true -require 'griffin/counting_semaphore' +require 'grpc_kit/thread_pool/auto_trimmer' module Griffin class ThreadPool - DEFAULT_POOL_SIZE = 20 - DEFAULT_QUEUE_SIZE = 512 + DEFAULT_MAX = 5 + DEFAULT_MIN = 1 + QUEUE_SIZE = 128 - def initialize(pool_size = DEFAULT_POOL_SIZE, queue_size: DEFAULT_QUEUE_SIZE, &block) - @pool_size = pool_size - @queue_size = queue_size + def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block) + @max_pool_size = max + @min_pool_size = min @block = block @shutdown = false - @semaphore = Griffin::CountingSemaphore.new(queue_size) - @tasks = Queue.new + @tasks = SizedQueue.new(QUEUE_SIZE) @spawned = 0 @workers = [] @mutex = Mutex.new + @waiting = 0 - @pool_size.times { spawn_thread } + @min_pool_size.times { spawn_thread } + @auto_trimmer = GrpcKit::ThreadPool::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!) end def schedule(task, &block) if task.nil? return @@ -30,29 +32,35 @@ if @shutdown raise "scheduling new task isn't allowed during shutdown" end # TODO: blocking now.. - @semaphore.wait @tasks.push(block || task) - @mutex.synchronize do - if @spawned < @pool_size - spawn_thread - end + if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) } + spawn_thread end end def shutdown @shutdown = true - @pool_size.times { @tasks.push(nil) } + @max_pool_size.times { @tasks.push(nil) } + @auto_trimmer.stop until @workers.empty? - Griffin.logger.debug("#{@pool_size - @spawned} worker thread(s) shutdowned, waiting #{@spawned}") + Griffin.logger.debug("Shutdown waiting #{@waiting} workers") sleep 1 end end + # For GrpcKit::ThreadPool::AutoTrimmer + def trim(force = false) + if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) } + GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}") + @tasks.push(nil) + end + end + private def spawn_thread @spawned += 1 worker = Thread.new(@spawned) do |i| @@ -62,20 +70,20 @@ loop do if @shutdown break end + @mutex.synchronize { @waiting += 1 } task = @tasks.pop + @mutex.synchronize { @waiting -= 1 } if task.nil? break end begin @block.call(task) rescue Exception => e # rubocop:disable Lint/RescueException Griffin.logger.error("An error occured on top level in worker #{Thread.current.name}: #{e.message} (#{e.class})\n #{Thread.current.backtrace.join("\n")} ") - ensure - @semaphore.signal end end Griffin.logger.debug("worker thread #{Thread.current.name} is stopping") @mutex.synchronize do