Sha256: d3571d2147dee1da4f3020c8729fae0f2dea5ef4681e225aababfe67fee87bb6

Contents?: true

Size: 826 Bytes

Versions: 1

Compression:

Stored size: 826 Bytes

Contents

# frozen_string_literal: true

export_default :ThreadPool

require 'etc'

# Implements a pool of threads
class ThreadPool
  attr_reader :size

  def initialize(size = Etc.nprocessors)
    @size = size
    @task_queue = ::Queue.new
    @threads = (1..@size).map { Thread.new { thread_loop } }
  end

  def process(&block)
    setup unless @task_queue

    watcher = Gyro::Async.new
    @task_queue << [block, watcher]
    watcher.await
  end

  def cast(&block)
    setup unless @task_queue

    @task_queue << [block, nil]
    self
  end

  def busy?
    !@task_queue.empty?
  end

  def thread_loop
    loop { run_queued_task }
  end

  def run_queued_task
    (block, watcher) = @task_queue.pop
    result = block.()
    watcher&.signal!(result)
  rescue Exception => e
    watcher ? watcher.signal!(e) : raise(e)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
polyphony-0.28 lib/polyphony/core/thread_pool.rb