Sha256: 66946730c86f1424d1e6c3bfbf9b2d9c13c8b5878aaef15ebd52329776fe5b41
Contents?: true
Size: 1.9 KB
Versions: 1
Compression:
Stored size: 1.9 KB
Contents
# frozen_string_literal: true module Tobox class ThreadedPool < Pool def initialize(_configuration) @parent_thread = Thread.main @threads = [] @threads_mutex = Thread::Mutex.new super end def start @workers.each do |wk| th = start_thread_worker(wk) @threads_mutex.synchronize do @threads << th end end end def stop shutdown_timeout = @configuration[:shutdown_timeout] grace_shutdown_timeout = @configuration[:grace_shutdown_timeout] super Thread.pass # let workers finish # soft exit join = lambda do |timeout| start = Process.clock_gettime(::Process::CLOCK_MONOTONIC) loop do terminating_th = @threads_mutex.synchronize { @threads.first } return unless terminating_th elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start break if elapsed > timeout terminating_th.join(timeout - elapsed) end end join.call(shutdown_timeout) # hard exit @threads_mutex.synchronize { @threads.each { |th| th.raise(KillError) } } join.call(grace_shutdown_timeout) @threads_mutex.synchronize { @threads.each(&:kill) } join.call(1) end private def start_thread_worker(wrk) Thread.start(wrk) do |worker| Thread.current.name = worker.label do_work(worker) @threads_mutex.synchronize do @threads.delete(Thread.current) if worker.finished? && @running idx = @workers.index(worker) raise Error, "worker not found" unless idx subst_worker = @configuration.worker_class.new(worker.label, @configuration) @workers[idx] = subst_worker subst_thread = start_thread_worker(subst_worker) @threads << subst_thread end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.7.0 | lib/tobox/pool/threaded_pool.rb |