lib/fluffle/server.rb in fluffle-0.1.1 vs lib/fluffle/server.rb in fluffle-0.2.0
- old
+ new
@@ -1,16 +1,18 @@
module Fluffle
class Server
include Connectable
- attr_reader :connection, :handlers
+ attr_reader :connection, :handlers, :handler_pool
- def initialize(url: nil)
+ # url: - Optional URL to pass to `Bunny.new` to immediately connect
+ # concurrency: - Number of threads to handle messages on (default: 1)
+ def initialize(url: nil, concurrency: 1)
self.connect(url) if url
- @handlers = {}
- @queues = {}
+ @handlers = {}
+ @handler_pool = Concurrent::FixedThreadPool.new concurrency
self.class.default_server ||= self
end
class << self
@@ -29,42 +31,106 @@
def start
@channel = @connection.create_channel
@exchange = @channel.default_exchange
+ raise 'No handlers defined' if @handlers.empty?
+
@handlers.each do |name, handler|
qualified_name = Fluffle.request_queue_name name
queue = @channel.queue qualified_name
- queue.subscribe do |delivery_info, properties, payload|
- self.handle_request queue_name: name,
- handler: handler,
- delivery_info: delivery_info,
- properties: properties,
- payload: payload
+ queue.subscribe do |_delivery_info, properties, payload|
+ @handler_pool.post do
+ self.handle_request handler: handler,
+ properties: properties,
+ payload: payload
+ end
end
end
- @channel.work_pool.join
+ self.wait_for_signal
end
- def handle_request(queue_name:, handler:, delivery_info:, properties:, payload:)
- id = nil
+ # NOTE: Keeping this in its own method so its functionality can be more
+ # easily overwritten by `Fluffle::Testing`.
+ def wait_for_signal
+ signal_read, signal_write = IO.pipe
+
+ %w[INT TERM].each do |signal|
+ Signal.trap(signal) do
+ signal_write.puts signal
+ end
+ end
+
+ # Adapted from Sidekiq:
+ # https://github.com/mperham/sidekiq/blob/e634177/lib/sidekiq/cli.rb#L94-L97
+ while io = IO.select([signal_read])
+ readables = io.first
+ signal = readables.first.gets.strip
+
+ Fluffle.logger.info "Received #{signal}; shutting down..."
+ @channel.work_pool.shutdown
+
+ return
+ end
+ end
+
+ def handle_request(handler:, properties:, payload:)
reply_to = properties[:reply_to]
+ responses = []
+
begin
- id, method, params = self.decode payload
+ decoded = self.decode payload
- validate_request method: method
+ requests =
+ if decoded.is_a? Hash
+ [ decoded ] # Single request
+ elsif decoded.is_a? Array
+ decoded # Batch request
+ else
+ raise Errors::InvalidRequestError.new('Payload was neither an Array nor an Object')
+ end
+ requests.each do |request|
+ response = self.call_handler handler: handler,
+ request: request
+
+ responses << response
+ end
+ rescue => err
+ responses << {
+ 'jsonrpc' => '2.0',
+ 'id' => nil,
+ 'error' => self.build_error_response(err)
+ }
+ end
+
+ responses.each do |response|
+ @exchange.publish Oj.dump(response), routing_key: reply_to,
+ correlation_id: response['id']
+ end
+ end
+
+ # handler - Instance of a `Handler` that may receive `#call`
+ # request - `Hash` representing a decoded Request
+ def call_handler(handler:, request:)
+ begin
+ # We don't yet know if it's valid, so we have to be as cautious as
+ # possible about getting the ID
+ id = begin request['id']; rescue; nil end
+
+ self.validate_request request
+
result = handler.call id: id,
- method: method,
- params: params,
- meta: {
- reply_to: reply_to
- }
+ method: request['method'],
+ params: request['params'],
+ meta: {}
rescue => err
+ log_error(err) if Fluffle.logger.error?
+
error = self.build_error_response err
end
response = { 'jsonrpc' => '2.0', 'id' => id }
@@ -72,31 +138,49 @@
response['error'] = error
else
response['result'] = result
end
- @exchange.publish Oj.dump(response), routing_key: reply_to,
- correlation_id: id
+ response
end
- protected
-
+ # Deserialize a JSON payload and extract its 3 members: id, method, params
+ #
+ # payload - `String` of the payload from the queue
+ #
+ # Returns a `Hash` from parsing the JSON payload (keys should be `String`)
def decode(payload)
- payload = Oj.load payload
-
- id = payload['id']
- method = payload['method']
- params = payload['params']
-
- [id, method, params]
+ Oj.load payload
end
- # Raises if elements of the request do not comply with the spec
+ # Raises if elements of the request payload do not comply with the spec
+ #
+ # payload - Decoded `Hash` of the payload (`String` keys)
def validate_request(request)
- raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request[:method]
+ raise Errors::InvalidRequestError.new("Improperly formatted Request (expected `Hash', got `#{request.class}')") unless request && request.is_a?(Hash)
+ raise Errors::InvalidRequestError.new("Missing `method' Request object member") unless request['method']
end
+ protected
+
+ # Logs a nicely-formmated error to `Fluffle.logger` with the class,
+ # message, and backtrace (if available)
+ def log_error(err)
+ backtrace = Array(err.backtrace).flatten.compact
+
+ backtrace =
+ if backtrace.empty?
+ ''
+ else
+ prefix = "\n "
+ prefix + backtrace.join(prefix)
+ end
+
+ message = "#{err.class}: #{err.message}#{backtrace}"
+ Fluffle.logger.error message
+ end
+
# Convert a Ruby error into a hash complying with the JSON-RPC spec
# for `Error` response objects
def build_error_response(err)
if err.is_a? Errors::BaseError
err.to_response
@@ -105,10 +189,10 @@
{ 'code' => -32601, 'message' => 'Method not found' }
else
response = {
'code' => 0,
- 'message' => err.message
+ 'message' => "#{err.class}: #{err.message}"
}
response['data'] = err.data if err.respond_to? :data
response