Sha256: 7d9ab126264c5aab309d97ecca30bc1e230ed6b7625cd02505a62292a1a26ba5

Contents?: true

Size: 999 Bytes

Versions: 2

Compression:

Stored size: 999 Bytes

Contents

# frozen_string_literal: true

class RactorPool::Channel
  def initialize # rubocop:disable Metrics/MethodLength
    @pipe = Ractor.new do
      listeners_count = 0
      loop do
        msg = Ractor.recv
        case msg
        in type: :close
          listeners_count.times { Ractor.yield(msg) }
          break
        in type: :subscription
          listeners_count += 1
          Ractor.yield({ type: :skip })
        in type: :data
          Ractor.yield({ type: :data, data: msg[:data] })
        else
        end
      end
    end
  end

  def <<(data)
    @pipe.send({ type: :data, data: data })
  end

  def close!
    @pipe.send({ type: :close }, move: true)
  end

  def subscribe
    @pipe.send({ type: :subscription }, move: true)
    loop do
      msg = @pipe.take
      case msg
        in type: :close
        return
        in type: :data
        yield(msg[:data])
        else
      end
    end
  rescue Ractor::ClosedError # rubocop:disable Lint/SuppressedException
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
ractor_pool-0.1.1 lib/ractor_pool/channel.rb
ractor_pool-0.1.0 lib/ractor_pool/channel.rb