Sha256: 96c7cf1bd9baf87f56cfb3730880857529b61f3e6aa747533ecdac221e51a4ae
Contents?: true
Size: 1.55 KB
Versions: 1
Compression:
Stored size: 1.55 KB
Contents
# frozen_string_literal: true module Async # Channel is a wrapper around Async::Queue that provides # a protocol and handy tools for passing data, exceptions and closing. # It is designed to be used with only one publisher and one subscriber class Channel class ChannelError < StandardError; end class ChannelClosedError < ChannelError; end def initialize @queue = Async::Queue.new @closed = false end def closed? @closed end # Methods for a publisher def <<(payload) raise(ChannelClosedError, "Cannot send to a closed channel") if @closed @queue << [:payload, payload] end def exception(exception) raise(ChannelClosedError, "Cannot send to a closed channel") if closed? @queue << [:exception, exception] end def close raise(ChannelClosedError, "Cannot close a closed channel") if closed? @queue << [:close] @closed = true end # Methods for a subscriber def dequeue each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended return payload end end def each # rubocop:disable Metrics/MethodLength raise(ChannelClosedError, "Cannot receive from a closed channel") if closed? @queue.each do |type, payload| case type when :exception payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace raise payload when :payload yield payload when :close break end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
grumlin-0.4.0 | lib/async/channel.rb |