lib/right_speed/processor.rb in right_speed-0.1.0 vs lib/right_speed/processor.rb in right_speed-0.2.0

- old
+ new

@@ -1,11 +1,12 @@ # frozen_string_literal: true require 'rack/builder' require_relative 'worker/accepter' -require_relative 'worker/reader' +require_relative 'worker/fair' +require_relative 'worker/roundrobin' require_relative 'connection_closer' module RightSpeed module Processor def self.setup(app:, worker_type:, workers:) @@ -16,12 +17,14 @@ else raise "Unexpected app #{app}" end handler = Ractor.make_shareable(Handler.new(app)) case worker_type - when :read - ReadProcessor.new(workers, handler) + when :roundrobin + RoundRobinProcessor.new(workers, handler) + when :fair + FairProcessor.new(workers, handler) when :accept AcceptProcessor.new(workers, handler) else raise "Unknown worker type #{worker_type}" end @@ -61,15 +64,15 @@ # ractors.each{|r| r.take} # finalizer.close rescue nil end end - class ReadProcessor < Base + class RoundRobinProcessor < Base def initialize(workers, handler) @worker_num = workers @handler = handler - @workers = workers.times.map{|i| Worker::Reader.new(id: i, handler: @handler)} + @workers = workers.times.map{|i| Worker::RoundRobin.new(id: i, handler: @handler)} @closer = ConnectionCloser.new @counter = 0 end def configure(listener:) @@ -87,35 +90,62 @@ @workers[current % @worker_num].process(conn) end def wait @workers.each{|w| w.wait} + @closer.wait end end + class FairProcessor < Base + def initialize(workers, handler) + @worker_num = workers + @handler = handler + @workers = workers.times.map{|i| Worker::Fair.new(id: i, handler: @handler)} + @closer = ConnectionCloser.new + end + + def configure(listener:) + @listener = listener + end + + def run + @listener.run(self) + @workers.each{|w| w.run(@listener.ractor)} + @closer.run(@workers.map{|w| w.ractor}) + end + + def process(conn) + Ractor.yield(conn, move: true) + end + + def wait + # listener, workers are using those outgoing to pass connections + @closer.wait + end + end + class AcceptProcessor < Base def initialize(workers, handler) @worker_num = workers @handler = handler @workers = workers.times.map{|i| Worker::Accepter.new(id: i, handler: @handler) } + @closer = ConnectionCloser.new end def configure(listener:) @listener = listener - @workers.each do |w| - w.configure(listener.sock) - end end def run - @workers.each do |w| - w.run - end - # TODO: connection closer + @listener.run + @workers.each{|w| w.run(@listener.sock)} + @closer.run(@workers.map{|w| w.ractor}) end def wait - @workers.each{|w| w.wait} + # workers are using those outgoing to pass connections + @closer.wait end end end end