Sha256: 7d9ab126264c5aab309d97ecca30bc1e230ed6b7625cd02505a62292a1a26ba5
Contents?: true
Size: 999 Bytes
Versions: 2
Compression:
Stored size: 999 Bytes
Contents
# frozen_string_literal: true class RactorPool::Channel def initialize # rubocop:disable Metrics/MethodLength @pipe = Ractor.new do listeners_count = 0 loop do msg = Ractor.recv case msg in type: :close listeners_count.times { Ractor.yield(msg) } break in type: :subscription listeners_count += 1 Ractor.yield({ type: :skip }) in type: :data Ractor.yield({ type: :data, data: msg[:data] }) else end end end end def <<(data) @pipe.send({ type: :data, data: data }) end def close! @pipe.send({ type: :close }, move: true) end def subscribe @pipe.send({ type: :subscription }, move: true) loop do msg = @pipe.take case msg in type: :close return in type: :data yield(msg[:data]) else end end rescue Ractor::ClosedError # rubocop:disable Lint/SuppressedException end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
ractor_pool-0.1.1 | lib/ractor_pool/channel.rb |
ractor_pool-0.1.0 | lib/ractor_pool/channel.rb |