Sha256: f4b95e25c16677f2601039bbffeb063f6d15d379038db65d76bbddcc238faa90

Contents?: true

Size: 1.17 KB

Versions: 3

Compression:

Stored size: 1.17 KB

Contents

# frozen_string_literal: true

require 'etc'

module Polyphony
  # Implements a pool of threads
  class ThreadPool
    attr_reader :size

    def self.process(&block)
      @default_pool ||= self.new
      @default_pool.process(&block)
    end

    def self.reset
      return unless @default_pool
      
      @default_pool.stop
      @default_pool = nil
    end

    def initialize(size = Etc.nprocessors)
      @size = size
      @task_queue = Gyro::Queue.new
      @threads = (1..@size).map { Thread.new { thread_loop } }
    end

    def process(&block)
      setup unless @task_queue

      async = Fiber.current.auto_async
      @task_queue << [block, async]
      async.await
    end

    def cast(&block)
      setup unless @task_queue

      @task_queue << [block, nil]
      self
    end

    def busy?
      !@task_queue.empty?
    end

    def thread_loop
      loop { run_queued_task }
    end

    def run_queued_task
      (block, watcher) = @task_queue.pop
      result = block.()
      watcher&.signal(result)
    rescue Exception => e
      watcher ? watcher.signal(e) : raise(e)
    end

    def stop
      @threads.each(&:kill)
      @threads.each(&:join)
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
polyphony-0.40 lib/polyphony/core/thread_pool.rb
polyphony-0.39 lib/polyphony/core/thread_pool.rb
polyphony-0.38 lib/polyphony/core/thread_pool.rb