Sha256: 318f1232c0ff7bea8a704a74ab3db7fcccd75f09eae72a6c661b6eb21a2518a4

Contents?: true

Size: 1.63 KB

Versions: 20

Compression:

Stored size: 1.63 KB

Contents

# frozen_string_literal: true

module Async
  # Channel is a wrapper around Async::Queue that provides
  # a protocol and handy tools for passing data, exceptions and closing.
  # It is designed to be used only with one publisher and one subscriber
  class Channel
    class ChannelError < StandardError; end

    class ChannelClosedError < ChannelError; end

    def initialize
      @queue = Async::Queue.new
      @closed = false
    end

    def closed?
      @closed
    end

    def open?
      !@closed
    end

    # Methods for a publisher
    def <<(payload)
      raise(ChannelClosedError, "Cannot send to a closed channel") if @closed

      @queue << [:payload, payload]
    end

    def exception(exception)
      raise(ChannelClosedError, "Cannot send to a closed channel") if closed?

      @queue << [:exception, exception]
    end

    def close
      return if closed?

      @queue << [:close]
      @closed = true
    end

    def close!
      return if closed?

      exception(ChannelClosedError.new("Channel was forcefully closed"))
      close
    end

    # Methods for a subscriber
    def dequeue
      each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended
        return payload
      end
    end

    def each
      raise(ChannelClosedError, "Cannot receive from a closed channel") if closed?

      @queue.each do |type, payload|
        case type
        when :exception
          payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace
          raise payload
        when :payload
          yield payload
        when :close
          break
        end
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
grumlin-0.23.0 lib/async/channel.rb
grumlin-0.22.5 lib/async/channel.rb
grumlin-0.22.4 lib/async/channel.rb
grumlin-0.22.3 lib/async/channel.rb
grumlin-0.22.2 lib/async/channel.rb
grumlin-0.22.1 lib/async/channel.rb
grumlin-0.22.0 lib/async/channel.rb
grumlin-0.21.1 lib/async/channel.rb
grumlin-0.21.0 lib/async/channel.rb
grumlin-0.20.2 lib/async/channel.rb
grumlin-0.20.1 lib/async/channel.rb
grumlin-0.20.0 lib/async/channel.rb
grumlin-0.19.7 lib/async/channel.rb
grumlin-0.19.6 lib/async/channel.rb
grumlin-0.19.5 lib/async/channel.rb
grumlin-0.19.4 lib/async/channel.rb
grumlin-0.19.3 lib/async/channel.rb
grumlin-0.19.2 lib/async/channel.rb
grumlin-0.19.1 lib/async/channel.rb
grumlin-0.19.0 lib/async/channel.rb