Sha256: c460cc1110a53a63f53bd509e90385bb4e819c4e16f9adaf515e6a30818a5271

Contents?: true

Size: 881 Bytes

Versions: 3

Compression:

Stored size: 881 Bytes

Contents

# frozen_string_literal: true

require_relative './exceptions'

module Polyphony
  # Implements a unidirectional communication channel along the lines of Go
  # (buffered) channels.
  class Channel
    def initialize
      @payload_queue = []
      @waiting_queue = []
    end

    def close
      stop = Polyphony::MoveOn.new
      @waiting_queue.slice(0..-1).each { |f| f.schedule(stop) }
    end

    def <<(value)
      if @waiting_queue.empty?
        @payload_queue << value
      else
        @waiting_queue.shift&.schedule(value)
      end
      snooze
    end

    def receive
      Gyro.ref
      if @payload_queue.empty?
        @waiting_queue << Fiber.current
        suspend
      else
        receive_from_queue
      end
    ensure
      Gyro.unref
    end

    def receive_from_queue
      payload = @payload_queue.shift
      snooze
      payload
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
polyphony-0.40 lib/polyphony/core/channel.rb
polyphony-0.39 lib/polyphony/core/channel.rb
polyphony-0.38 lib/polyphony/core/channel.rb