Sha256: 0fec22289b6df28a91a51701a5448cab5951c8f09a7f8776c3ab4c80d20869d4

Contents?: true

Size: 1.51 KB

Versions: 15

Compression:

Stored size: 1.51 KB

Contents

# frozen_string_literal: true

# Channel is a wrapper around Async::Queue that provides
# a protocol and handy tools for passing data, exceptions and closing.
# It is designed to be used only with one publisher and one subscriber
class Async::Channel
  class ChannelError < StandardError; end

  class ChannelClosedError < ChannelError; end

  def initialize
    @queue = Async::Queue.new
    @closed = false
  end

  def closed?
    @closed
  end

  def open?
    !@closed
  end

  # Methods for a publisher
  def <<(payload)
    raise(ChannelClosedError, "Cannot send to a closed channel") if @closed

    @queue << [:payload, payload]
  end

  def exception(exception)
    raise(ChannelClosedError, "Cannot send to a closed channel") if closed?

    @queue << [:exception, exception]
  end

  def close
    return if closed?

    @queue << [:close]
    @closed = true
  end

  def close!
    return if closed?

    exception(ChannelClosedError.new("Channel was forcefully closed"))
    close
  end

  # Methods for a subscriber
  def dequeue
    each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended
      return payload
    end
  end

  def each
    raise(ChannelClosedError, "Cannot receive from a closed channel") if closed?

    @queue.each do |type, payload|
      case type
      when :exception
        payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace
        raise payload
      when :payload
        yield payload
      when :close
        break
      end
    end
  end
end

Version data entries

15 entries across 15 versions & 1 rubygems

Version Path
grumlin-1.2.0 lib/async/channel.rb
grumlin-1.1.0 lib/async/channel.rb
grumlin-1.0.4 lib/async/channel.rb
grumlin-1.0.3 lib/async/channel.rb
grumlin-1.0.3.beta1 lib/async/channel.rb
grumlin-1.0.2 lib/async/channel.rb
grumlin-1.0.1 lib/async/channel.rb
grumlin-1.0.0 lib/async/channel.rb
grumlin-1.0.0.rc7 lib/async/channel.rb
grumlin-1.0.0.rc6 lib/async/channel.rb
grumlin-1.0.0.rc5 lib/async/channel.rb
grumlin-1.0.0.rc4 lib/async/channel.rb
grumlin-1.0.0.rc3 lib/async/channel.rb
grumlin-1.0.0.rc2 lib/async/channel.rb
grumlin-1.0.0.rc1 lib/async/channel.rb