Sha256: c612cca55bcbe17c99c3f0efb2963e7deedf191353c990eae58c08a97273a546

Contents?: true

Size: 914 Bytes

Versions: 9

Compression:

Stored size: 914 Bytes

Contents

# frozen_string_literal: true

require_relative './exceptions'

module Polyphony
  # Implements a unidirectional communication channel along the lines of Go
  # (buffered) channels.
  class Channel
    def initialize
      @payload_queue = []
      @waiting_queue = []
    end

    def close
      stop = Polyphony::MoveOn.new
      @waiting_queue.slice(0..-1).each { |f| f.schedule(stop) }
    end

    def <<(value)
      if @waiting_queue.empty?
        @payload_queue << value
      else
        @waiting_queue.shift&.schedule(value)
      end
      snooze
    end

    def receive
      Thread.current.agent.ref
      if @payload_queue.empty?
        @waiting_queue << Fiber.current
        suspend
      else
        receive_from_queue
      end
    ensure
      Thread.current.agent.unref
    end

    def receive_from_queue
      payload = @payload_queue.shift
      snooze
      payload
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
polyphony-0.43.8 lib/polyphony/core/channel.rb
polyphony-0.43.6 lib/polyphony/core/channel.rb
polyphony-0.43.5 lib/polyphony/core/channel.rb
polyphony-0.43.4 lib/polyphony/core/channel.rb
polyphony-0.43.3 lib/polyphony/core/channel.rb
polyphony-0.43.2 lib/polyphony/core/channel.rb
polyphony-0.43.1 lib/polyphony/core/channel.rb
polyphony-0.43 lib/polyphony/core/channel.rb
polyphony-0.42 lib/polyphony/core/channel.rb