Sha256: 417e1baa9a7a1957e4e6cf70fde6301eb3cf997ab1c0bd2fd31ee850c4d7e570

Contents?: true

Size: 1.77 KB

Versions: 3

Compression:

Stored size: 1.77 KB

Contents

# frozen_string_literal: true

class Async::Channel
  attr_reader :subscribers

  class Error < StandardError; end

  class ChannelClosedError < Error; end

  def initialize(limit = 1, **options)
    @queue = Async::Q.new(limit, **options)
    @subscribers = 0

    @parent = options[:parent]
    @closed = false
  end

  def count = @queue.count
  def length = @queue.length
  def size = @queue.size

  def full? = @queue.full?
  def empty? = @queue.empty?
  def closed? = @closed
  def open? = !closed?

  def <<(item) = enqueue(item)

  def enqueue(message)
    check_channel_writeable!

    @queue << [:message, message]
  end

  def enqueue_all(messages)
    check_channel_writeable!

    @queue.enqueue_all(messages.map { |message| [:message, message] })
  end

  def error(e)
    check_channel_writeable!

    @queue << [:error, e]
  end

  def close
    @closed = true

    @queue.expand(@subscribers)

    @subscribers.times do
      @queue << [:close]
    end
  end

  def dequeue
    check_channel_readable!

    type, message = @queue.dequeue
    raise ChannelClosedError, "Channel was closed" if type == :close
    raise message if type == :error # TODO: fix backtrace

    message
  end

  def each
    check_channel_readable!

    @subscribers += 1
    while message = dequeue # rubocop:disable Lint/AssignmentInCondition
      yield message
    end
  rescue ChannelClosedError
    nil
  ensure
    @subscribers -= 1
  end

  def async(parent: (@parent || Task.current), &block)
    each do |item|
      parent.async(item, &block)
    end
  end

  private

  def check_channel_writeable!
    raise ChannelClosedError, "Can't send to a closed channel" if closed?
  end

  def check_channel_readable!
    raise ChannelClosedError, "Cannot receive from a closed channel" if closed? && empty?
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
async-tools-0.1.5 lib/async/channel.rb
async-tools-0.1.4 lib/async/channel.rb
async-tools-0.1.3 lib/async/channel.rb