Sha256: 54ea6c1350592be2188686de0849dfef69bd886b8b91fea6b682e3a207caa052

Contents?: true

Size: 1.63 KB

Versions: 3

Compression:

Stored size: 1.63 KB

Contents

# frozen_string_literal: true

require 'bundler/setup'
require 'polyphony'

# Based on the design of Elixir's GenStage

class Producer
  def initialize(mod, *a, **b)
    extend(mod)
    setup(*a, **b)
    @_fiber = spin do
      receive_loop do |msg|
        case msg[:kind]
        when :demand
          items = handle_demand(msg[:limit])
          msg[:peer] << items
        end
      end
    end
  end

  def <<(msg)
    @_fiber << msg
  end
end

module Counter
  def setup(counter = 0)
    @counter = counter
  end

  def handle_demand(demand)
    events = (@counter...@counter + demand).to_a
    @counter += demand
    events
  end
end

counter = Producer.new(Counter, 0)

class Consumer
  def initialize(mod, *a, **b)
    extend(mod)
    setup(*a, **b) if respond_to?(:setup)
    @_fiber = spin do
      while true
        items = get_items
        handle_items(items)
      end
    end

    @max_demand = 10
    @min_demand = 5
  end

  def subscribe(upstream)
    @upstream = upstream
  end

  private

  def get_items
    send_demand(@max_demand) if !@sent_demand
    items = receive
    send_demand(@min_demand)
    items
  end

  def send_demand(demand)
    if @upstream
      @upstream << { peer: Fiber.current, kind: :demand, limit: demand }
      @sent_demand = true
    else
      sleep 0.1
    end
  end
end

module Printer
  def handle_items(items)
    sleep 1
    puts "got: #{items.join(' ')}"
  end
end

# counter << { peer: Fiber.current, kind: :demand, limit: 10 }
# r = receive

# p r: r

# counter << { peer: Fiber.current, kind: :demand, limit: 10 }
# r = receive
# p r: r

printer = Consumer.new(Printer)
printer.subscribe(counter)

sleep

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
polyphony-1.6 examples/core/stages.rb
polyphony-1.5 examples/core/stages.rb
polyphony-1.4 examples/core/stages.rb