Sha256: a975a95098282187a989f4646bd1bdd59e5f06fc239a54a89b66c6b364a8e227

Contents?: true

Size: 1.47 KB

Versions: 6

Compression:

Stored size: 1.47 KB

Contents

# frozen_string_literal: true

module Async
  # Channel is a wrapper around Async::Queue that provides
  # a protocol and handy tools for passing data, exceptions and closing.
  # It is designed to be used with only one publisher and one subscriber
  class Channel
    class ChannelError < StandardError; end

    class ChannelClosedError < ChannelError; end

    def initialize
      @queue = Async::Queue.new
      @closed = false
    end

    def closed?
      @closed
    end

    # Methods for a publisher
    def <<(payload)
      raise(ChannelClosedError, "Cannot send to a closed channel") if @closed

      @queue << [:payload, payload]
    end

    def exception(exception)
      raise(ChannelClosedError, "Cannot send to a closed channel") if closed?

      @queue << [:exception, exception]
    end

    def close
      return if closed?

      @queue << [:close]
      @closed = true
    end

    # Methods for a subscriber
    def dequeue
      each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended
        return payload
      end
    end

    def each
      raise(ChannelClosedError, "Cannot receive from a closed channel") if closed?

      @queue.each do |type, payload|
        case type
        when :exception
          payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace
          raise payload
        when :payload
          yield payload
        when :close
          break
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
grumlin-0.11.0 lib/async/channel.rb
grumlin-0.10.1 lib/async/channel.rb
grumlin-0.10.0 lib/async/channel.rb
grumlin-0.9.0 lib/async/channel.rb
grumlin-0.8.0 lib/async/channel.rb
grumlin-0.7.0 lib/async/channel.rb