Sha256: 627df703ba95795ea783851360aca6a7e6f0074866f14223d83653b1ca3315e0
Contents?: true
Size: 1.86 KB
Versions: 4
Compression:
Stored size: 1.86 KB
Contents
# frozen_string_literal: true require "monitor" module Tobox class ThreadedPool < Pool def initialize(_configuration) @parent_thread = Thread.main @threads = [] @threads.extend(MonitorMixin) super end def start @workers.each do |wk| th = start_thread_worker(wk) @threads.synchronize do @threads << th end end end def stop shutdown_timeout = @configuration[:shutdown_timeout] grace_shutdown_timeout = @configuration[:grace_shutdown_timeout] super Thread.pass # let workers finish # soft exit join = lambda do |timeout| start = Process.clock_gettime(::Process::CLOCK_MONOTONIC) loop do terminating_th = @threads.synchronize { @threads.first } return unless terminating_th elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start break if elapsed > timeout terminating_th.join(timeout - elapsed) end end join.call(shutdown_timeout) # hard exit @threads.synchronize { @threads.each { |th| th.raise(KillError) } } join.call(grace_shutdown_timeout) @threads.synchronize { @threads.each(&:kill) } join.call(1) end private def start_thread_worker(wrk) Thread.start(wrk) do |worker| Thread.current.name = worker.label do_work(worker) @threads.synchronize do @threads.delete(Thread.current) if worker.finished? && @running idx = @workers.index(worker) raise Error, "worker not found" unless idx subst_worker = Worker.new(worker.label, @configuration) @workers[idx] = subst_worker subst_thread = start_thread_worker(subst_worker) @threads << subst_thread end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.6.1 | lib/tobox/pool/threaded_pool.rb |
tobox-0.6.0 | lib/tobox/pool/threaded_pool.rb |
tobox-0.5.2 | lib/tobox/pool/threaded_pool.rb |
tobox-0.5.1 | lib/tobox/pool/threaded_pool.rb |