Sha256: 627df703ba95795ea783851360aca6a7e6f0074866f14223d83653b1ca3315e0

Contents?: true

Size: 1.86 KB

Versions: 4

Compression:

Stored size: 1.86 KB

Contents

# frozen_string_literal: true

require "monitor"

module Tobox
  class ThreadedPool < Pool
    def initialize(_configuration)
      @parent_thread = Thread.main
      @threads = []
      @threads.extend(MonitorMixin)
      super
    end

    def start
      @workers.each do |wk|
        th = start_thread_worker(wk)
        @threads.synchronize do
          @threads << th
        end
      end
    end

    def stop
      shutdown_timeout = @configuration[:shutdown_timeout]
      grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]

      super
      Thread.pass # let workers finish

      # soft exit
      join = lambda do |timeout|
        start = Process.clock_gettime(::Process::CLOCK_MONOTONIC)

        loop do
          terminating_th = @threads.synchronize { @threads.first }

          return unless terminating_th

          elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start

          break if elapsed > timeout

          terminating_th.join(timeout - elapsed)
        end
      end

      join.call(shutdown_timeout)

      # hard exit
      @threads.synchronize { @threads.each { |th| th.raise(KillError) } }
      join.call(grace_shutdown_timeout)
      @threads.synchronize { @threads.each(&:kill) }
      join.call(1)
    end

    private

    def start_thread_worker(wrk)
      Thread.start(wrk) do |worker|
        Thread.current.name = worker.label

        do_work(worker)

        @threads.synchronize do
          @threads.delete(Thread.current)

          if worker.finished? && @running
            idx = @workers.index(worker)

            raise Error, "worker not found" unless idx

            subst_worker = Worker.new(worker.label, @configuration)
            @workers[idx] = subst_worker
            subst_thread = start_thread_worker(subst_worker)
            @threads << subst_thread
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
tobox-0.6.1 lib/tobox/pool/threaded_pool.rb
tobox-0.6.0 lib/tobox/pool/threaded_pool.rb
tobox-0.5.2 lib/tobox/pool/threaded_pool.rb
tobox-0.5.1 lib/tobox/pool/threaded_pool.rb