lib/fluffle/server.rb in fluffle-0.6.3 vs lib/fluffle/server.rb in fluffle-0.7.0
- old
+ new
@@ -1,17 +1,25 @@
+require 'concurrent'
+require 'oj'
+
module Fluffle
class Server
include Connectable
- attr_reader :connection, :handlers, :handler_pool
+ attr_reader :confirms, :connection, :handlers, :handler_pool
+ attr_accessor :publish_timeout
# url: - Optional URL to pass to `Bunny.new` to immediately connect
# concurrency: - Number of threads to handle messages on (default: 1)
- def initialize(url: nil, connection: nil, concurrency: 1)
+ # confirms: - Whether or not to use RabbitMQ confirms
+ def initialize(url: nil, connection: nil, concurrency: 1, confirms: false)
url_or_connection = url || connection
self.connect(url_or_connection) if url_or_connection
+ @confirms = confirms
+ @publish_timeout = 5
+
@handlers = {}
@handler_pool = Concurrent::FixedThreadPool.new concurrency
self.class.default_server ||= self
end
@@ -25,17 +33,26 @@
raise ArgumentError, 'Cannot provide both handler: and block'
end
handler = Fluffle::Handlers::Dispatcher.new(&block) if block
+ raise ArgumentError, 'Handler cannot be nil' if handler.nil?
+
@handlers[queue.to_s] = handler
end
def start
+ @handlers.freeze
+
@channel = @connection.create_channel
@exchange = @channel.default_exchange
+ if confirms
+ @confirmer = Fluffle::Confirmer.new channel: @channel
+ @confirmer.confirm_select
+ end
+
raise 'No handlers defined' if @handlers.empty?
@handlers.each do |name, handler|
qualified_name = Fluffle.request_queue_name name
queue = @channel.queue qualified_name
@@ -100,11 +117,21 @@
'id' => id,
'error' => self.build_error_response(err)
}
end
- @exchange.publish Oj.dump(response), routing_key: reply_to,
- correlation_id: response['id']
+ stack = Fluffle::MiddlewareStack.new
+
+ if confirms
+ stack.push ->(publish) {
+ @confirmer.with_confirmation timeout: publish_timeout, &publish
+ }
+ end
+
+ stack.call do
+ @exchange.publish Oj.dump(response), routing_key: reply_to,
+ correlation_id: response['id']
+ end
end
# handler - Instance of a `Handler` that may receive `#call`
# request - `Hash` representing a decoded Request
def call_handler(handler:, request:)