Sha256: c98c49976ed2634608918c1a8cac22b4cd865a5f8fdab262e5e05ef82aa31894
Contents?: true
Size: 1.52 KB
Versions: 5
Compression:
Stored size: 1.52 KB
Contents
# frozen_string_literal: true module Async # Channel is a wrapper around Async::Queue that provides # a protocol and handy tools for passing data, exceptions and closing. # It is designed to be used with only one publisher and one subscriber class Channel class ChannelError < StandardError; end class ChannelClosedError < ChannelError; end def initialize @queue = Async::Queue.new @closed = false end def closed? @closed end # Methods for a publisher def <<(payload) raise(ChannelClosedError, "Cannot send to a closed channel") if @closed @queue << [:payload, payload] end def exception(exception) raise(ChannelClosedError, "Cannot send to a closed channel") if closed? @queue << [:exception, exception] end def close raise(ChannelClosedError, "Cannot close a closed channel") if closed? @queue << [:close] @closed = true end # Methods for a subscriber def dequeue each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended return payload end end def each raise(ChannelClosedError, "Cannot receive from a closed channel") if closed? @queue.each do |type, payload| case type when :exception payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace raise payload when :payload yield payload when :close break end end end end end
Version data entries
5 entries across 5 versions & 1 rubygems
Version | Path |
---|---|
grumlin-0.6.2 | lib/async/channel.rb |
grumlin-0.6.1 | lib/async/channel.rb |
grumlin-0.6.0 | lib/async/channel.rb |
grumlin-0.5.1 | lib/async/channel.rb |
grumlin-0.5.0 | lib/async/channel.rb |