Sha256: 8936f26dcbdad21c15cc08c42972b926f195d702830f77334cccfcf9b3abff05

Contents?: true

Size: 1.29 KB

Versions: 1

Compression:

Stored size: 1.29 KB

Contents

# frozen_string_literal: true

module Tobox
  class KillError < Interrupt; end

  class Pool
    def initialize(configuration)
      @configuration = configuration
      @logger = @configuration.default_logger
      @num_workers = configuration[:concurrency]
      @workers = Array.new(@num_workers) do |idx|
        @configuration.worker_class.new("tobox-worker-#{idx}", configuration)
      end
      @worker_error_handlers = Array(@configuration.lifecycle_events[:error_worker])
      @running = true
    end

    def stop
      return unless @running

      @workers.each(&:finish!)
      @running = false
    end

    private

    def do_work(wrk)
      wrk.work
    rescue Exception => e # rubocop:disable Lint/RescueException
      handle_exception(wrk, e)
    end

    def handle_exception(wrk, exc)
      case exc
      when KillError
      # noop
      when Exception
        wrk.finish!
        @logger.error do
          "(worker: #{wrk.label}) -> " \
            "crashed with error\n" \
            "#{exc.class}: #{exc.message}\n" \
            "#{exc.backtrace.join("\n")}"
        end
        @worker_error_handlers.each { |hd| hd.call(exc) }
      end
    end
  end

  autoload :ThreadedPool, File.join(__dir__, "pool", "threaded_pool")
  autoload :FiberPool, File.join(__dir__, "pool", "fiber_pool")
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
tobox-0.7.0 lib/tobox/pool.rb