lib/polyphony/core/channel.rb in polyphony-0.19 vs lib/polyphony/core/channel.rb in polyphony-0.20
- old
+ new
@@ -2,10 +2,12 @@
export_default :Channel
Exceptions = import('./exceptions')
+# Implements a unidirectional communication channel along the lines of Go
+# (buffered) channels.
class Channel
def initialize
@payload_queue = []
@waiting_queue = []
end
@@ -13,27 +15,32 @@
def close
stop = Exceptions::MoveOn.new
@waiting_queue.slice(0..-1).each { |f| f.schedule(stop) }
end
- def <<(o)
+ def <<(value)
if @waiting_queue.empty?
- @payload_queue << o
+ @payload_queue << value
else
- @waiting_queue.shift&.schedule(o)
+ @waiting_queue.shift&.schedule(value)
end
snooze
end
def receive
- EV.ref
+ Gyro.ref
if @payload_queue.empty?
@waiting_queue << Fiber.current
+ suspend
else
- payload = @payload_queue.shift
- Fiber.current.schedule(payload)
+ receive_from_queue
end
- suspend
ensure
- EV.unref
+ Gyro.unref
end
-end
\ No newline at end of file
+
+ def receive_from_queue
+ payload = @payload_queue.shift
+ snooze
+ payload
+ end
+end