lib/zapp/server.rb in zapp-0.1.1 vs lib/zapp/server.rb in zapp-0.2.1

- old
+ new

@@ -1,55 +1,75 @@ # frozen_string_literal: true module Zapp # The Zap HTTP Server, listens on a TCP connection and processes incoming requests class Server - attr_reader(:tcp_connection, :worker_pool) + attr_reader(:worker_pool, :socket_pipe_receiver) def initialize - @tcp_connection = TCPServer.new(Zapp.config.host, Zapp.config.port) - @worker_pool = Zapp::WorkerPool.new(app: Zapp.config.app) + @socket_pipe = Zapp::Pipe.new + @context_pipe = Zapp::Pipe.new + + @socket_pipe_receiver = Zapp::SocketPipe::Receiver.new(pipe: @socket_pipe) + + @worker_pool = Zapp::WorkerPool.new(app: Zapp.config.app, socket_pipe: @socket_pipe, context_pipe: @context_pipe) end def run - parser = Puma::HttpParser.new - log_start loop do - socket = tcp_connection.accept + socket = socket_pipe_receiver.take + next if socket.eof? - context = Zapp::HTTPContext::Context.new(socket: socket) + parsing_thread_pool.post do + ctx = Zapp::HTTPContext::Context.new(socket: socket) - context.req.parse!(parser: parser) - - worker_pool.process(context: context) - - rescue Puma::HttpParserError => e - context.res.write(data: "Invalid HTTP request", status: 500, headers: {}) - Zapp::Logger.warn("Puma parser error: #{e}") + worker_pool.process(context: ctx) unless ctx.client_closed? # Parsing failed + end + rescue Errno::ECONNRESET + next end rescue SignalException, IRB::Abort => e shutdown(e) end def shutdown(err = nil) - Zapp::Logger.info("Received signal #{err}") unless err.nil? + Zapp::Logger.info("Received signal #{err.class.name}") unless err.nil? Zapp::Logger.info("Gracefully shutting down workers, allowing request processing to finish") + socket_pipe_receiver.drain worker_pool.drain Zapp::Logger.info("Done. See you next time!") end private def log_start - Zapp::Logger.info("Zap version: #{Zapp::VERSION}") + Zapp::Logger.info(" + ⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡ + ⚡ ███████╗ █████╗ ██████╗ ██████╗ ⚡ + ⚡ ╚══███╔╝██╔══██╗██╔══██╗██╔══██╗ ⚡ + ⚡ ███╔╝ ███████║██████╔╝██████╔╝ ⚡ + ⚡ ███╔╝ ██╔══██║██╔═══╝ ██╔═══╝ ⚡ + ⚡ ███████╗██║ ██║██║ ██║ ⚡ + ⚡ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ ⚡ + ⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡⚡ +") + Zapp::Logger.info("Zapp version: #{Zapp::VERSION}") Zapp::Logger.info("Environment: #{Zapp.config.mode}") Zapp::Logger.info("Serving: #{Zapp.config.env[Rack::RACK_URL_SCHEME]}://#{Zapp.config.host}:#{Zapp.config.port}") Zapp::Logger.info("Parallel workers: #{Zapp.config.parallelism}") Zapp::Logger.info("Ready to accept requests") + end + + def parsing_thread_pool + @parsing_thread_pool ||= Concurrent::ThreadPoolExecutor.new( + min_threads: Zapp.config.parallelism, + max_threads: Zapp.config.parallelism, + max_queue: Zapp.config.parallelism * 1_000 + ) end end end