Sha256: a975a95098282187a989f4646bd1bdd59e5f06fc239a54a89b66c6b364a8e227
Contents?: true
Size: 1.47 KB
Versions: 6
Compression:
Stored size: 1.47 KB
Contents
# frozen_string_literal: true module Async # Channel is a wrapper around Async::Queue that provides # a protocol and handy tools for passing data, exceptions and closing. # It is designed to be used with only one publisher and one subscriber class Channel class ChannelError < StandardError; end class ChannelClosedError < ChannelError; end def initialize @queue = Async::Queue.new @closed = false end def closed? @closed end # Methods for a publisher def <<(payload) raise(ChannelClosedError, "Cannot send to a closed channel") if @closed @queue << [:payload, payload] end def exception(exception) raise(ChannelClosedError, "Cannot send to a closed channel") if closed? @queue << [:exception, exception] end def close return if closed? @queue << [:close] @closed = true end # Methods for a subscriber def dequeue each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended return payload end end def each raise(ChannelClosedError, "Cannot receive from a closed channel") if closed? @queue.each do |type, payload| case type when :exception payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace raise payload when :payload yield payload when :close break end end end end end
Version data entries
6 entries across 6 versions & 1 rubygems