Sha256: 8c88c79bcf0938ad500b5d3be08ceaa587bfaf1aab29a0cfb347731da5f22227
Contents?: true
Size: 1.87 KB
Versions: 1
Compression:
Stored size: 1.87 KB
Contents
require 'amqp' #TODO: RabbitMQ connection options module RabbitRPC class Connection PREFETCH_DEFAULT = 5 include Logging attr_reader :queue_name, :uri, :opts, :prefetch def initialize(queue_name, uri, prefetch, opts = {}) @queue_name = queue_name @uri = uri @opts = opts @prefetch = prefetch || PREFETCH_DEFAULT end def listen! EventMachine.run do close_connection_on_interrupt subscribe_to_queue end end private def subscribe_to_queue queue.subscribe do |metadata, payload| EM.defer do request_handler = RequestHandler.new(payload) response_message = request_handler.execute unless request_handler.one_way channel.default_exchange.publish( response_message.to_msgpack, routing_key: metadata.reply_to, correlation_id: metadata.message_id, mandatory: true ) end end end end def connect! connection_params = ::AMQP::Client.parse_connection_uri @uri connection_params.merge! @opts logger.info 'Connecting to RabbitMQ' @connection ||= ::AMQP.connect connection_params end def channel logger.info 'Establishng connection with channel' @channel ||= ::AMQP::Channel.new connect!, prefetch: @prefetch end # Private - Establish connection with a RabbitMQ queue. # TODO: Queue options need to be provided def queue logger.info 'Connecting to queue' @queue ||= channel.queue @queue_name end def close_connection_on_interrupt %w[INT TERM].each do |interrupt_type| Signal.trap(interrupt_type) do logger.info 'Exiting' @connection.close do EventMachine.stop { exit } end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rabbit_rpc-0.0.2 | lib/rabbit_rpc/connection.rb |