Sha256: ce68ad2a5cb7a135778f6f8827513e672d2e5b5db6f0ddee749045c167501ee7
Contents?: true
Size: 1.88 KB
Versions: 3
Compression:
Stored size: 1.88 KB
Contents
# frozen_string_literal: true class Async::WorkerPool extend Forwardable class Error < StandardError; end class StoppedError < Error; end class << self def start(...) new(...) end def with(*args, **params, &) new(*args, **params).with(&) end end def initialize(workers: 1, queue_limit: 1, parent: Async::Task.current, &block) @queue_limit = queue_limit @parent = parent @block = block @semaphore = Async::Semaphore.new(workers, parent: @parent) @channel = Async::Channel.new(@queue_limit, parent: @semaphore) @task = start end def_delegator :@semaphore, :limit, :workers def_delegator :@semaphore, :count, :busy def_delegator :@task, :wait def_delegator :@channel, :close, :stop def_delegator :@channel, :open?, :running? def_delegator :@channel, :closed?, :stopped? def waiting @semaphore.waiting.size end def call(*args, **params, &block) block ||= @block raise ArgumentError, "Block must be passed to #schedule if it's not passed to #initlaize" if block.nil? raise StoppedError, "The pool was stopped" unless running? Async::ResultNotification.new.tap do |notification| @channel.enqueue([notification, [args, params], block]) end end def schedule_all(tasks, &block) block ||= @block raise ArgumentError, "Block must be passed to #schedule_all if it's not passed to #initlaize" if block.nil? raise StoppedError, "The pool was stopped" unless running? tasks = tasks.map { |task| [Async::ResultNotification.new, [[task], {}], block] } @channel.enqueue_all(tasks) tasks.map(&:first) end def with yield self ensure stop wait end private def start @parent.async do @channel.async do |_, (notification, (args, params), block)| notification.signal do block.call(*args, **params) end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
async-tools-0.1.2 | lib/async/worker_pool.rb |
async-tools-0.1.1 | lib/async/worker_pool.rb |
async-tools-0.1.0 | lib/async/worker_pool.rb |