Sha256: 07f2a2e9da597b975fb313ef8bedd754a2fd9fa65de8ce7cbb1bca0f3688de9e

Contents?: true

Size: 1.81 KB

Versions: 4

Compression:

Stored size: 1.81 KB

Contents

# frozen_string_literal: true

require "timeout"
require "fiber_scheduler"

module Tobox
  class FiberPool < Pool
    def initialize(_)
      Sequel.extension(:fiber_concurrency)
      super
      @fibers = []

      @fiber_mtx = Mutex.new
      @fiber_cond = ConditionVariable.new
      @fiber_thread = nil
    end

    def start
      @fiber_thread = Thread.start do
        Thread.current.name = "tobox-fibers-thread"

        begin
          FiberScheduler do
            @fiber_mtx.synchronize do
              @workers.each do |worker|
                @fibers << start_fiber_worker(worker)
              end
              @fiber_cond.signal
            end
          end
        rescue KillError
          @fibers.each { |f| f.raise(KillError) }
        end
      end
      @fiber_mtx.synchronize do
        @fiber_cond.wait(@fiber_mtx)
      end
    end

    def stop
      shutdown_timeout = @configuration[:shutdown_timeout]
      grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]

      super

      @fiber_thread.join(shutdown_timeout)

      return unless @fiber_thread.alive?

      @fiber_thread.raise(KillError)
      @fiber_thread.join(grace_shutdown_timeout)
      @fiber_thread.kill
      @fiber_thread.join(1)
    end

    private

    def start_fiber_worker(worker)
      Fiber.schedule do
        do_work(worker)

        @fiber_mtx.synchronize do
          @fibers.delete(Fiber.current)

          if worker.finished? && @running
            idx = @workers.index(worker)

            raise Error, "worker not found" unless idx

            subst_worker = Worker.new(worker.label, @configuration)
            @workers[idx] = subst_worker
            subst_fiber = start_fiber_worker(subst_worker)
            @fiber_mtx.synchronize { @fibers << subst_fiber }
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
tobox-0.6.1 lib/tobox/pool/fiber_pool.rb
tobox-0.6.0 lib/tobox/pool/fiber_pool.rb
tobox-0.5.2 lib/tobox/pool/fiber_pool.rb
tobox-0.5.1 lib/tobox/pool/fiber_pool.rb