Sha256: ee452a627896e0b967b0a2fc1290c1a5d87a562fdc70726750a87266d2f40c64

Contents?: true

Size: 1.71 KB

Versions: 3

Compression:

Stored size: 1.71 KB

Contents

# frozen_string_literal: true

require "forwardable"

require "async/notification"

class Async::Q
  extend Forwardable

  attr_reader :items, :limit

  def initialize(limit = Float::INFINITY, items: [], parent: nil)
    @limit = limit || Float::INFINITY
    @parent = parent

    raise ArgumentError, "Items size is greter than limit: #{items.count} > #{@limit}" if items.count > @limit

    @items = items
    @any_notification = Async::Notification.new
    @free_notification = Async::Notification.new
  end

  def_delegators :@items, :count, :empty?, :length, :size

  def_delegator :self, :enqueue, :<<
  def_delegator :self, :full?, :limited?
  def_delegator :self, :resize, :scale

  def full?
    size >= @limit
  end

  def resize(new_limit)
    if new_limit > @limit
      @limit = new_limit
      @free_notification.signal
    elsif new_limit <= 0
      raise ArgumentError, "Limit cannot be <= 0: #{new_limit}"
    elsif size > new_limit
      raise ArgumentError, "New limit cannot be lower than the current size: #{size} > #{new_limit}"
    else
      @limit = new_limit
    end
  end

  def expand(n)
    resize(limit + n)
  end

  def shrink(n)
    resize(limit - n)
  end

  def enqueue(item)
    @free_notification.wait while full?

    @items.push(item)
    @any_notification.signal
  end

  def enqueue_all(items)
    items.each { |item| enqueue(item) }
  end

  def dequeue
    @any_notification.wait while empty?

    item = @items.shift

    @free_notification.signal

    item
  end

  def each
    while item = dequeue # rubocop:disable Lint/AssignmentInCondition
      yield(item)
    end
  end

  def async(parent: (@parent || Task.current), &block)
    each do |item|
      parent.async(item, &block)
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
async-tools-0.1.2 lib/async/q.rb
async-tools-0.1.1 lib/async/q.rb
async-tools-0.1.0 lib/async/q.rb