Sha256: 96c7cf1bd9baf87f56cfb3730880857529b61f3e6aa747533ecdac221e51a4ae

Contents?: true

Size: 1.55 KB

Versions: 1

Compression:

Stored size: 1.55 KB

Contents

# frozen_string_literal: true

module Async
  # Channel is a wrapper around Async::Queue that provides
  # a protocol and handy tools for passing data, exceptions and closing.
  # It is designed to be used with only one publisher and one subscriber
  class Channel
    class ChannelError < StandardError; end

    class ChannelClosedError < ChannelError; end

    def initialize
      @queue = Async::Queue.new
      @closed = false
    end

    def closed?
      @closed
    end

    # Methods for a publisher
    def <<(payload)
      raise(ChannelClosedError, "Cannot send to a closed channel") if @closed

      @queue << [:payload, payload]
    end

    def exception(exception)
      raise(ChannelClosedError, "Cannot send to a closed channel") if closed?

      @queue << [:exception, exception]
    end

    def close
      raise(ChannelClosedError, "Cannot close a closed channel") if closed?

      @queue << [:close]
      @closed = true
    end

    # Methods for a subscriber
    def dequeue
      each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended
        return payload
      end
    end

    def each # rubocop:disable Metrics/MethodLength
      raise(ChannelClosedError, "Cannot receive from a closed channel") if closed?

      @queue.each do |type, payload|
        case type
        when :exception
          payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace
          raise payload
        when :payload
          yield payload
        when :close
          break
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
grumlin-0.4.0 lib/async/channel.rb