Sha256: 631b1ef43e0bb451a0f5459c9ba4e4e3c6464bbdca7fa04889587c53d4481378

Contents?: true

Size: 920 Bytes

Versions: 1

Compression:

Stored size: 920 Bytes

Contents

# frozen_string_literal: true

export_default :ThreadPool

require 'etc'

# Implements a pool of threads
class ThreadPool
  attr_reader :size

  def self.process(&block)
    @default_pool ||= new
    @default_pool.process(&block)
  end

  def initialize(size = Etc.nprocessors)
    @size = size
    @task_queue = ::Queue.new
    @threads = (1..@size).map { Thread.new { thread_loop } }
  end

  def process(&block)
    setup unless @task_queue

    watcher = Gyro::Async.new
    @task_queue << [block, watcher]
    watcher.await
  end

  def cast(&block)
    setup unless @task_queue

    @task_queue << [block, nil]
    self
  end

  def busy?
    !@task_queue.empty?
  end

  def thread_loop
    loop { run_queued_task }
  end

  def run_queued_task
    (block, watcher) = @task_queue.pop
    result = block.()
    watcher&.signal!(result)
  rescue Exception => e
    watcher ? watcher.signal!(e) : raise(e)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
polyphony-0.29 lib/polyphony/core/thread_pool.rb