lib/right_speed/processor.rb in right_speed-0.1.0 vs lib/right_speed/processor.rb in right_speed-0.2.0
- old
+ new
@@ -1,11 +1,12 @@
# frozen_string_literal: true
require 'rack/builder'
require_relative 'worker/accepter'
-require_relative 'worker/reader'
+require_relative 'worker/fair'
+require_relative 'worker/roundrobin'
require_relative 'connection_closer'
module RightSpeed
module Processor
def self.setup(app:, worker_type:, workers:)
@@ -16,12 +17,14 @@
else
raise "Unexpected app #{app}"
end
handler = Ractor.make_shareable(Handler.new(app))
case worker_type
- when :read
- ReadProcessor.new(workers, handler)
+ when :roundrobin
+ RoundRobinProcessor.new(workers, handler)
+ when :fair
+ FairProcessor.new(workers, handler)
when :accept
AcceptProcessor.new(workers, handler)
else
raise "Unknown worker type #{worker_type}"
end
@@ -61,15 +64,15 @@
# ractors.each{|r| r.take}
# finalizer.close rescue nil
end
end
- class ReadProcessor < Base
+ class RoundRobinProcessor < Base
def initialize(workers, handler)
@worker_num = workers
@handler = handler
- @workers = workers.times.map{|i| Worker::Reader.new(id: i, handler: @handler)}
+ @workers = workers.times.map{|i| Worker::RoundRobin.new(id: i, handler: @handler)}
@closer = ConnectionCloser.new
@counter = 0
end
def configure(listener:)
@@ -87,35 +90,62 @@
@workers[current % @worker_num].process(conn)
end
def wait
@workers.each{|w| w.wait}
+ @closer.wait
end
end
+ class FairProcessor < Base
+ def initialize(workers, handler)
+ @worker_num = workers
+ @handler = handler
+ @workers = workers.times.map{|i| Worker::Fair.new(id: i, handler: @handler)}
+ @closer = ConnectionCloser.new
+ end
+
+ def configure(listener:)
+ @listener = listener
+ end
+
+ def run
+ @listener.run(self)
+ @workers.each{|w| w.run(@listener.ractor)}
+ @closer.run(@workers.map{|w| w.ractor})
+ end
+
+ def process(conn)
+ Ractor.yield(conn, move: true)
+ end
+
+ def wait
+ # listener, workers are using those outgoing to pass connections
+ @closer.wait
+ end
+ end
+
class AcceptProcessor < Base
def initialize(workers, handler)
@worker_num = workers
@handler = handler
@workers = workers.times.map{|i| Worker::Accepter.new(id: i, handler: @handler) }
+ @closer = ConnectionCloser.new
end
def configure(listener:)
@listener = listener
- @workers.each do |w|
- w.configure(listener.sock)
- end
end
def run
- @workers.each do |w|
- w.run
- end
- # TODO: connection closer
+ @listener.run
+ @workers.each{|w| w.run(@listener.sock)}
+ @closer.run(@workers.map{|w| w.ractor})
end
def wait
- @workers.each{|w| w.wait}
+ # workers are using those outgoing to pass connections
+ @closer.wait
end
end
end
end