Sha256: ad709ba0a5f6718e62d383f586bfcb8d08584f5b6f826599887c1e8c1310e57e

Contents?: true

Size: 663 Bytes

Versions: 4

Compression:

Stored size: 663 Bytes

Contents

# frozen_string_literal: true

export_default :Channel

Exceptions = import('./exceptions')

class Channel
  def initialize
    @payload_queue = []
    @waiting_queue = []
  end

  def close
    stop = Exceptions::MoveOn.new
    @waiting_queue.slice(0..-1).each { |f| f.schedule(stop) }
  end

  def <<(o)
    if @waiting_queue.empty?
      @payload_queue << o
    else
      @waiting_queue.shift&.schedule(o)
    end
    snooze
  end

  def receive
    EV.ref
    if @payload_queue.empty?
      @waiting_queue << Fiber.current
    else
      payload = @payload_queue.shift
      Fiber.current.schedule(payload)
    end
    suspend
  ensure
    EV.unref
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
polyphony-0.19 lib/polyphony/core/channel.rb
polyphony-0.17 lib/polyphony/core/channel.rb
polyphony-0.16 lib/polyphony/core/channel.rb
polyphony-0.15 lib/polyphony/core/channel.rb