Sha256: b5ae25c084e3a37316edd5351b6b0e26cfb53664b4c9156212463dd7053590df

Contents?: true

Size: 1.03 KB

Versions: 2

Compression:

Stored size: 1.03 KB

Contents

# frozen_string_literal: true

module Zapp
  # Manages and dispatches work to a pool of Zap::Worker's
  class WorkerPool
    attr_reader(:pipe, :workers, :parallelism)

    def initialize(app:)
      @pipe = Ractor.new do
        loop do
          Ractor.yield(Ractor.receive)
        end
      end

      @workers = []
      Zapp.config.parallelism.times do |i|
        @workers << Worker.new(
          pipe_: pipe,
          app_: app,
          index: i
        )
      end
    end

    # Sends data through the pipe to one of our workers,
    # sends a tuple of [context, shutdown], if shutdown is true it breaks from its processing loop
    # otherwise the worker processes the HTTP context
    def process(context:, shutdown: false)
      pipe.send([context.dup, shutdown], move: true)
    end

    # Finishes processing of all requests and shuts down workers
    def drain
      Zapp.config.parallelism.times { process(context: nil, shutdown: true) }
      workers.map(&:terminate)
    rescue Ractor::RemoteError
      # Ignored
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
zapp-0.1.1 lib/zapp/worker_pool.rb
zapp-0.1.0 lib/zapp/worker_pool.rb