Sha256: c964d0443e57384c55dfae2eba8b164af34f94724069c53a28bcffd7fbf37fac
Contents?: true
Size: 1.67 KB
Versions: 1
Compression:
Stored size: 1.67 KB
Contents
# frozen_string_literal: true require "monitor" module Tobox class ThreadedPool < Pool def initialize(_configuration) @parent_thread = Thread.main @threads = [] @threads.extend(MonitorMixin) super end def start @workers.each do |wk| th = start_thread_worker(wk) @threads.synchronize do @threads << th end end end def stop shutdown_timeout = @configuration[:shutdown_timeout] deadline = Process.clock_gettime(::Process::CLOCK_MONOTONIC) super Thread.pass # let workers finish # soft exit while Process.clock_gettime(::Process::CLOCK_MONOTONIC) - deadline < shutdown_timeout return if @threads.empty? sleep 0.5 end # hard exit @threads.each { |th| th.raise(KillError) } while (th = @threads.pop) th.value # waits end end private def start_thread_worker(wrk) Thread.start(wrk) do |worker| Thread.current.name = worker.label do_work(worker) @threads.synchronize do @threads.delete(Thread.current) if worker.finished? && @running idx = @workers.index(worker) raise Error, "worker not found" unless idx subst_worker = Worker.new(worker.label, @configuration) @workers[idx] = subst_worker subst_thread = start_thread_worker(subst_worker) @threads << subst_thread end # all workers went down abruply, we need to kill the process. # @parent_thread.raise(Interrupt) if wk.finished? && @threads.empty? && @running end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.5.0 | lib/tobox/pool/threaded_pool.rb |