Sha256: 2fe3408b773160fe0d172acfc3a2860ba764c4ad4881f4580690f81b6920fd3d

Contents?: true

Size: 1.24 KB

Versions: 7

Compression:

Stored size: 1.24 KB

Contents

# frozen_string_literal: true

module Tobox
  class ThreadedPool < Pool
    class KillError < Interrupt; end

    def initialize(_configuration)
      @threads = []
      super
      @error_handlers = Array(@configuration.lifecycle_events[:error])
    end

    def start
      @workers.each_with_index do |wk, idx|
        th = Thread.start do
          Thread.current.name = "tobox-worker-#{idx}"

          begin
            wk.work
          rescue KillError
            # noop
          rescue Exception => e # rubocop:disable Lint/RescueException
            @error_handlers.each { |hd| hd.call(:tobox_error, e) }
            raise e
          end

          @threads.delete(Thread.current)
        end
        @threads << th
      end
    end

    def stop
      shutdown_timeout = @configuration[:shutdown_timeout]

      deadline = Process.clock_gettime(::Process::CLOCK_MONOTONIC)

      super
      Thread.pass # let workers finish

      # soft exit
      while Process.clock_gettime(::Process::CLOCK_MONOTONIC) - deadline < shutdown_timeout
        return if @threads.empty?

        sleep 0.5
      end

      # hard exit
      @threads.each { |th| th.raise(KillError) }
      while (th = @threads.pop)
        th.value # waits
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
tobox-0.1.6 lib/tobox/pool/threaded_pool.rb
tobox-0.1.5 lib/tobox/pool/threaded_pool.rb
tobox-0.1.4 lib/tobox/pool/threaded_pool.rb
tobox-0.1.3 lib/tobox/pool/threaded_pool.rb
tobox-0.1.2 lib/tobox/pool/threaded_pool.rb
tobox-0.1.1 lib/tobox/pool/threaded_pool.rb
tobox-0.1.0 lib/tobox/pool/threaded_pool.rb