Sha256: c612cca55bcbe17c99c3f0efb2963e7deedf191353c990eae58c08a97273a546
Contents?: true
Size: 914 Bytes
Versions: 9
Compression:
Stored size: 914 Bytes
Contents
# frozen_string_literal: true require_relative './exceptions' module Polyphony # Implements a unidirectional communication channel along the lines of Go # (buffered) channels. class Channel def initialize @payload_queue = [] @waiting_queue = [] end def close stop = Polyphony::MoveOn.new @waiting_queue.slice(0..-1).each { |f| f.schedule(stop) } end def <<(value) if @waiting_queue.empty? @payload_queue << value else @waiting_queue.shift&.schedule(value) end snooze end def receive Thread.current.agent.ref if @payload_queue.empty? @waiting_queue << Fiber.current suspend else receive_from_queue end ensure Thread.current.agent.unref end def receive_from_queue payload = @payload_queue.shift snooze payload end end end
Version data entries
9 entries across 9 versions & 1 rubygems