lib/async/channel.rb in grumlin-0.23.0 vs lib/async/channel.rb in grumlin-1.0.0.rc1

- old
+ new

@@ -1,75 +1,73 @@ # frozen_string_literal: true -module Async - # Channel is a wrapper around Async::Queue that provides - # a protocol and handy tools for passing data, exceptions and closing. - # It is designed to be used only with one publisher and one subscriber - class Channel - class ChannelError < StandardError; end +# Channel is a wrapper around Async::Queue that provides +# a protocol and handy tools for passing data, exceptions and closing. +# It is designed to be used only with one publisher and one subscriber +class Async::Channel + class ChannelError < StandardError; end - class ChannelClosedError < ChannelError; end + class ChannelClosedError < ChannelError; end - def initialize - @queue = Async::Queue.new - @closed = false - end + def initialize + @queue = Async::Queue.new + @closed = false + end - def closed? - @closed - end + def closed? + @closed + end - def open? - !@closed - end + def open? + !@closed + end - # Methods for a publisher - def <<(payload) - raise(ChannelClosedError, "Cannot send to a closed channel") if @closed + # Methods for a publisher + def <<(payload) + raise(ChannelClosedError, "Cannot send to a closed channel") if @closed - @queue << [:payload, payload] - end + @queue << [:payload, payload] + end - def exception(exception) - raise(ChannelClosedError, "Cannot send to a closed channel") if closed? + def exception(exception) + raise(ChannelClosedError, "Cannot send to a closed channel") if closed? - @queue << [:exception, exception] - end + @queue << [:exception, exception] + end - def close - return if closed? + def close + return if closed? - @queue << [:close] - @closed = true - end + @queue << [:close] + @closed = true + end - def close! - return if closed? + def close! + return if closed? - exception(ChannelClosedError.new("Channel was forcefully closed")) - close - end + exception(ChannelClosedError.new("Channel was forcefully closed")) + close + end - # Methods for a subscriber - def dequeue - each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended - return payload - end + # Methods for a subscriber + def dequeue + each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended + return payload end + end - def each - raise(ChannelClosedError, "Cannot receive from a closed channel") if closed? + def each + raise(ChannelClosedError, "Cannot receive from a closed channel") if closed? - @queue.each do |type, payload| - case type - when :exception - payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace - raise payload - when :payload - yield payload - when :close - break - end + @queue.each do |type, payload| + case type + when :exception + payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace + raise payload + when :payload + yield payload + when :close + break end end end end