Sha256: dada5b903032b84dbe7881413d24d27a94e7862e78fb3032653ed3c9c7ca1089

Contents?: true

Size: 1.75 KB

Versions: 3

Compression:

Stored size: 1.75 KB

Contents

# frozen_string_literal: true

class Async::Channel
  extend Forwardable

  attr_reader :subscribers

  class Error < StandardError; end

  class ChannelClosedError < Error; end

  def initialize(limit = 1, **options)
    @queue = Async::Q.new(limit, **options)
    @subscribers = 0

    @parent = options[:parent]
    @closed = false
  end

  def_delegators :@queue, :count, :empty?, :length, :size, :full?
  def_delegator :self, :enqueue, :<<

  def enqueue(message)
    check_channel_writeable!

    @queue << [:message, message]
  end

  def enqueue_all(messages)
    check_channel_writeable!

    @queue.enqueue_all(messages.map { |message| [:message, message] })
  end

  def error(e)
    check_channel_writeable!

    @queue << [:error, e]
  end

  def close
    @closed = true

    @queue.expand(@subscribers)

    @subscribers.times do
      @queue << [:close]
    end
  end

  def closed?
    @closed
  end

  def open?
    !closed?
  end

  def dequeue
    check_channel_readable!

    type, message = @queue.dequeue
    raise ChannelClosedError, "Channel was closed" if type == :close
    raise message if type == :error # TODO: fix backtrace

    message
  end

  def each
    check_channel_readable!

    @subscribers += 1
    while message = dequeue # rubocop:disable Lint/AssignmentInCondition
      yield message
    end
  rescue ChannelClosedError
    nil
  ensure
    @subscribers -= 1
  end

  def async(parent: (@parent || Task.current), &block)
    each do |item|
      parent.async(item, &block)
    end
  end

  private

  def check_channel_writeable!
    raise ChannelClosedError, "Can't send to a closed channel" if closed?
  end

  def check_channel_readable!
    raise ChannelClosedError, "Cannot receive from a closed channel" if closed? && empty?
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
async-tools-0.1.2 lib/async/channel.rb
async-tools-0.1.1 lib/async/channel.rb
async-tools-0.1.0 lib/async/channel.rb