lib/fluffle/server.rb in fluffle-0.6.3 vs lib/fluffle/server.rb in fluffle-0.7.0

- old
+ new

@@ -1,17 +1,25 @@ +require 'concurrent' +require 'oj' + module Fluffle class Server include Connectable - attr_reader :connection, :handlers, :handler_pool + attr_reader :confirms, :connection, :handlers, :handler_pool + attr_accessor :publish_timeout # url: - Optional URL to pass to `Bunny.new` to immediately connect # concurrency: - Number of threads to handle messages on (default: 1) - def initialize(url: nil, connection: nil, concurrency: 1) + # confirms: - Whether or not to use RabbitMQ confirms + def initialize(url: nil, connection: nil, concurrency: 1, confirms: false) url_or_connection = url || connection self.connect(url_or_connection) if url_or_connection + @confirms = confirms + @publish_timeout = 5 + @handlers = {} @handler_pool = Concurrent::FixedThreadPool.new concurrency self.class.default_server ||= self end @@ -25,17 +33,26 @@ raise ArgumentError, 'Cannot provide both handler: and block' end handler = Fluffle::Handlers::Dispatcher.new(&block) if block + raise ArgumentError, 'Handler cannot be nil' if handler.nil? + @handlers[queue.to_s] = handler end def start + @handlers.freeze + @channel = @connection.create_channel @exchange = @channel.default_exchange + if confirms + @confirmer = Fluffle::Confirmer.new channel: @channel + @confirmer.confirm_select + end + raise 'No handlers defined' if @handlers.empty? @handlers.each do |name, handler| qualified_name = Fluffle.request_queue_name name queue = @channel.queue qualified_name @@ -100,11 +117,21 @@ 'id' => id, 'error' => self.build_error_response(err) } end - @exchange.publish Oj.dump(response), routing_key: reply_to, - correlation_id: response['id'] + stack = Fluffle::MiddlewareStack.new + + if confirms + stack.push ->(publish) { + @confirmer.with_confirmation timeout: publish_timeout, &publish + } + end + + stack.call do + @exchange.publish Oj.dump(response), routing_key: reply_to, + correlation_id: response['id'] + end end # handler - Instance of a `Handler` that may receive `#call` # request - `Hash` representing a decoded Request def call_handler(handler:, request:)