lib/fluffle/server.rb in fluffle-0.1.1 vs lib/fluffle/server.rb in fluffle-0.2.0

- old
+ new

@@ -1,16 +1,18 @@ module Fluffle class Server include Connectable - attr_reader :connection, :handlers + attr_reader :connection, :handlers, :handler_pool - def initialize(url: nil) + # url: - Optional URL to pass to `Bunny.new` to immediately connect + # concurrency: - Number of threads to handle messages on (default: 1) + def initialize(url: nil, concurrency: 1) self.connect(url) if url - @handlers = {} - @queues = {} + @handlers = {} + @handler_pool = Concurrent::FixedThreadPool.new concurrency self.class.default_server ||= self end class << self @@ -29,42 +31,106 @@ def start @channel = @connection.create_channel @exchange = @channel.default_exchange + raise 'No handlers defined' if @handlers.empty? + @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name - queue.subscribe do |delivery_info, properties, payload| - self.handle_request queue_name: name, - handler: handler, - delivery_info: delivery_info, - properties: properties, - payload: payload + queue.subscribe do |_delivery_info, properties, payload| + @handler_pool.post do + self.handle_request handler: handler, + properties: properties, + payload: payload + end end end - @channel.work_pool.join + self.wait_for_signal end - def handle_request(queue_name:, handler:, delivery_info:, properties:, payload:) - id = nil + # NOTE: Keeping this in its own method so its functionality can be more + # easily overwritten by `Fluffle::Testing`. + def wait_for_signal + signal_read, signal_write = IO.pipe + + %w[INT TERM].each do |signal| + Signal.trap(signal) do + signal_write.puts signal + end + end + + # Adapted from Sidekiq: + # https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97 + while io = IO.select([signal_read]) + readables = io.first + signal = readables.first.gets.strip + + Fluffle.logger.info "Received #{signal}; shutting down..." + @channel.work_pool.shutdown + + return + end + end + + def handle_request(handler:, properties:, payload:) reply_to = properties[:reply_to] + responses = [] + begin - id, method, params = self.decode payload + decoded = self.decode payload - validate_request method: method + requests = + if decoded.is_a? Hash + [ decoded ] # Single request + elsif decoded.is_a? Array + decoded # Batch request + else + raise Errors::InvalidRequestError.new('Payload was neither an Array nor an Object') + end + requests.each do |request| + response = self.call_handler handler: handler, + request: request + + responses << response + end + rescue => err + responses << { + 'jsonrpc' => '2.0', + 'id' => nil, + 'error' => self.build_error_response(err) + } + end + + responses.each do |response| + @exchange.publish Oj.dump(response), routing_key: reply_to, + correlation_id: response['id'] + end + end + + # handler - Instance of a `Handler` that may receive `#call` + # request - `Hash` representing a decoded Request + def call_handler(handler:, request:) + begin + # We don't yet know if it's valid, so we have to be as cautious as + # possible about getting the ID + id = begin request['id']; rescue; nil end + + self.validate_request request + result = handler.call id: id, - method: method, - params: params, - meta: { - reply_to: reply_to - } + method: request['method'], + params: request['params'], + meta: {} rescue => err + log_error(err) if Fluffle.logger.error? + error = self.build_error_response err end response = { 'jsonrpc' => '2.0', 'id' => id } @@ -72,31 +138,49 @@ response['error'] = error else response['result'] = result end - @exchange.publish Oj.dump(response), routing_key: reply_to, - correlation_id: id + response end - protected - + # Deserialize a JSON payload and extract its 3 members: id, method, params + # + # payload - `String` of the payload from the queue + # + # Returns a `Hash` from parsing the JSON payload (keys should be `String`) def decode(payload) - payload = Oj.load payload - - id = payload['id'] - method = payload['method'] - params = payload['params'] - - [id, method, params] + Oj.load payload end - # Raises if elements of the request do not comply with the spec + # Raises if elements of the request payload do not comply with the spec + # + # payload - Decoded `Hash` of the payload (`String` keys) def validate_request(request) - raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request[:method] + raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash) + raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method'] end + protected + + # Logs a nicely-formmated error to `Fluffle.logger` with the class, + # message, and backtrace (if available) + def log_error(err) + backtrace = Array(err.backtrace).flatten.compact + + backtrace = + if backtrace.empty? + '' + else + prefix = "\n " + prefix + backtrace.join(prefix) + end + + message = "#{err.class}: #{err.message}#{backtrace}" + Fluffle.logger.error message + end + # Convert a Ruby error into a hash complying with the JSON-RPC spec # for `Error` response objects def build_error_response(err) if err.is_a? Errors::BaseError err.to_response @@ -105,10 +189,10 @@ { 'code' => -32601, 'message' => 'Method not found' } else response = { 'code' => 0, - 'message' => err.message + 'message' => "#{err.class}: #{err.message}" } response['data'] = err.data if err.respond_to? :data response