Sha256: 4703f8d97d504f09174e9fcf0817a147d4eaaed87fa0225ee89a9e3f6c2c73b4

Contents?: true

Size: 1.89 KB

Versions: 1

Compression:

Stored size: 1.89 KB

Contents

# frozen_string_literal: true

require "timeout"
require "async/scheduler"

module Tobox
  class FiberPool < Pool
    def initialize(_)
      Sequel.extension(:fiber_concurrency)
      super
      @fibers = []

      @fiber_mtx = Mutex.new
      @fiber_cond = ConditionVariable.new
      @fiber_thread = nil
    end

    def start
      @fiber_thread = Thread.start do
        Thread.current.name = "tobox-fibers-thread"

        begin
          Fiber.set_scheduler(Async::Scheduler.new)

          @fiber_mtx.synchronize do
            @workers.each do |worker|
              @fibers << start_fiber_worker(worker)
            end
            @fiber_cond.signal
          end
        rescue KillError
          @fibers.each { |f| f.raise(KillError) }
        end
      end

      @fiber_mtx.synchronize do
        @fiber_cond.wait(@fiber_mtx)
      end
    end

    def stop
      shutdown_timeout = @configuration[:shutdown_timeout]
      grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]

      super

      th = @fiber_thread

      return unless th

      th.join(shutdown_timeout)

      return unless th.alive?

      th.raise(KillError)
      th.join(grace_shutdown_timeout)
      th.kill
      th.join(1)
    end

    private

    def handle_exception(wrk, exc)
      # noop
      return if exc.is_a?(::Async::Stop)

      super
    end

    def start_fiber_worker(worker)
      Fiber.schedule do
        do_work(worker)

        @fiber_mtx.synchronize do
          @fibers.delete(Fiber.current)

          if worker.finished? && @running
            idx = @workers.index(worker)

            raise Error, "worker not found" unless idx

            subst_worker = @configuration.worker_class.new(worker.label, @configuration)
            @workers[idx] = subst_worker
            @fibers << start_fiber_worker(subst_worker)
          end
        end
      rescue KillError
        # noop
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
tobox-0.7.0 lib/tobox/pool/fiber_pool.rb