Sha256: ed79fd23a01b1f7d64c23ab397b46a9ab434bf84673ad940fea94a9d322657dd

Contents?: true

Size: 819 Bytes

Versions: 16

Compression:

Stored size: 819 Bytes

Contents

# frozen_string_literal: true

export_default :Channel

Exceptions = import('./exceptions')

# Implements a unidirectional communication channel along the lines of Go
# (buffered) channels.
class Channel
  def initialize
    @payload_queue = []
    @waiting_queue = []
  end

  def close
    stop = Exceptions::MoveOn.new
    @waiting_queue.slice(0..-1).each { |f| f.schedule(stop) }
  end

  def <<(value)
    if @waiting_queue.empty?
      @payload_queue << value
    else
      @waiting_queue.shift&.schedule(value)
    end
    snooze
  end

  def receive
    Gyro.ref
    if @payload_queue.empty?
      @waiting_queue << Fiber.current
      suspend
    else
      receive_from_queue
    end
  ensure
    Gyro.unref
  end

  def receive_from_queue
    payload = @payload_queue.shift
    snooze
    payload
  end
end

Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
polyphony-0.36 lib/polyphony/core/channel.rb
polyphony-0.34 lib/polyphony/core/channel.rb
polyphony-0.33 lib/polyphony/core/channel.rb
polyphony-0.32 lib/polyphony/core/channel.rb
polyphony-0.31 lib/polyphony/core/channel.rb
polyphony-0.30 lib/polyphony/core/channel.rb
polyphony-0.29 lib/polyphony/core/channel.rb
polyphony-0.28 lib/polyphony/core/channel.rb
polyphony-0.27 lib/polyphony/core/channel.rb
polyphony-0.26 lib/polyphony/core/channel.rb
polyphony-0.25 lib/polyphony/core/channel.rb
polyphony-0.24 lib/polyphony/core/channel.rb
polyphony-0.23 lib/polyphony/core/channel.rb
polyphony-0.22 lib/polyphony/core/channel.rb
polyphony-0.21 lib/polyphony/core/channel.rb
polyphony-0.20 lib/polyphony/core/channel.rb