lib/zapp/worker/request_processor.rb in zapp-0.2.1 vs lib/zapp/worker/request_processor.rb in zapp-0.2.2
- old
+ new
@@ -2,59 +2,55 @@
module Zapp
class Worker < Ractor
# Processes HTTP requests
class RequestProcessor
- attr_reader(:app, :config, :socket_pipe_sender, :context_pipe)
+ attr_reader(:socket_pipe_sender, :context_pipe)
- def initialize(context_pipe:, socket_pipe:, app:, config:)
- @app = app
- @config = config
+ def initialize(context_pipe:, socket_pipe:)
@socket_pipe_sender = Zapp::SocketPipe::Sender.new(pipe: socket_pipe)
@context_pipe = context_pipe
end
def loop
while (context = context_pipe.take)
if context == Zapp::WorkerPool::SIGNALS[:EXIT]
- logger.trace("Received exit signal, shutting down")
- thread_pool.shutdown
+ Zapp::Logger.trace("Received exit signal, shutting down")
+ shutdown
break
end
+ process = lambda {
+ process(context: context)
+ }
- process = lambda {
- process(context: context)
- }
+ if Zapp.config.log_requests
+ log_request_time(context: context, &process)
+ else
+ process.call
+ end
- if config.log_requests
- log_request_time(context: context, &process)
- else
- process.call
- end
-
- # We send sockets that the client hasn't closed yet,
- # back to the main ractor for HTTP request parsing again
- socket_pipe_sender.push(context.socket) unless context.client_closed?
-
+ # We send sockets that the client hasn't closed yet,
+ # back to the main ractor for HTTP request parsing again
+ socket_pipe_sender.push(context.socket) unless context.client_closed?
end
end
private
# Processes an HTTP request
def process(context:)
- env = prepare_env(data: context.req.data, body: context.req.body, env: config.env.dup)
+ env = prepare_env(data: context.req.data, body: context.req.body, env: Zapp.config.env.dup)
- status, headers, response_body_stream = @app.call(env)
+ status, headers, response_body_stream = Zapp.config.app.call(env)
response_body = body_stream_to_string(response_body_stream)
context.res.write(data: response_body, status: status, headers: headers)
rescue StandardError => e
context.res.write(data: "An unexpected error occurred", status: 500, headers: {})
- logger.error("#{e}\n\n#{e.backtrace&.join(",\n")}") if config.log_uncaught_errors
+ Zapp::Logger.error("#{e}\n\n#{e.backtrace&.join(",\n")}") if Zapp.config.log_uncaught_errors
end
# Merges HTTP data and body into the env to be passed to the rack app
def prepare_env(data:, body:, env:)
data["QUERY_STRING"] = ""
@@ -77,11 +73,11 @@
request_time = ((Time.now.to_f * 1000) - start).truncate(2)
method = context.req.data["REQUEST_METHOD"]
path = context.req.data["PATH_INFO"]
status = context.res.status
- logger.info(
+ Zapp::Logger.info(
"#{method} #{path} - Completed in #{request_time}ms with status #{status}"
)
end
# Loops over a body stream and returns a single string
@@ -94,21 +90,20 @@
stream.close if stream.respond_to?(:close)
response_body
end
+ def shutdown
+ Zapp::Logger.flush
+ thread_pool.shutdown
+ end
+
def thread_pool
@thread_pool ||= Concurrent::ThreadPoolExecutor.new(
- min_threads: config.threads_per_worker,
- max_threads: config.threads_per_worker,
- max_queue: 1000,
+ min_threads: Zapp.config.threads_per_worker,
+ max_threads: Zapp.config.threads_per_worker,
+ max_queue: 1000
)
- end
-
- def logger
- @logger ||= config.logger_class.new do |l|
- l.prefix = Ractor.current.name
- end
end
end
end
end