Sha256: c460cc1110a53a63f53bd509e90385bb4e819c4e16f9adaf515e6a30818a5271
Contents?: true
Size: 881 Bytes
Versions: 3
Compression:
Stored size: 881 Bytes
Contents
# frozen_string_literal: true require_relative './exceptions' module Polyphony # Implements a unidirectional communication channel along the lines of Go # (buffered) channels. class Channel def initialize @payload_queue = [] @waiting_queue = [] end def close stop = Polyphony::MoveOn.new @waiting_queue.slice(0..-1).each { |f| f.schedule(stop) } end def <<(value) if @waiting_queue.empty? @payload_queue << value else @waiting_queue.shift&.schedule(value) end snooze end def receive Gyro.ref if @payload_queue.empty? @waiting_queue << Fiber.current suspend else receive_from_queue end ensure Gyro.unref end def receive_from_queue payload = @payload_queue.shift snooze payload end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
polyphony-0.40 | lib/polyphony/core/channel.rb |
polyphony-0.39 | lib/polyphony/core/channel.rb |
polyphony-0.38 | lib/polyphony/core/channel.rb |