require "bunny" class SidekickQueuePublisher attr_accessor :logger def initialize(config, logger) @logger = logger @config = config.with_indifferent_access # Deprecations section if @config[:brokers] raise SyntaxError, "ERROR: 'brokers' is a deprecated config value. Please use something like:\n:broker:\n :ip: 127.0.0.1\n :port: 1234" end end # Publish a message using a given routing key # If a temp reply queue name is supplied, this method # will automatically subscribe to the reply queue # and block until a message is received. def publish(message, routing_key, temp_reply_queue_name=nil) with_bunny_connection(temp_reply_queue_name) do |reply_queue| if routing_key =~ /amq.gen/ logger.warn "Cannot publish messages to queues starting with amq.gen - this must have been an old request and will be dropped." return true else logger.info "Publishing message to #{routing_key}: #{message}" end if reply_queue response = nil @b.queue(routing_key).publish(message, :persistent => true) msg="" begin Timeout::timeout(5) do while true msg = reply_queue.pop(:ack => true) msg = msg[:payload] if msg.is_a?(Hash) # Support older implementations of bunny lib if msg == :queue_empty # keep going else logger.info "Got msg #{msg}" response = msg break end end end rescue Timeout::Error logger.error "Timed out while waiting for response from server." response = ":error: Timed out while waiting for response, messaging server may be down." end begin reply_queue.delete rescue logger.warn "Error deleting reply queue #{reply_queue.inspect}" end return response else @b.queue(routing_key).publish(message, :persistent => true) end return true end end private def with_bunny_connection(temp_reply_queue_name=nil, &block) begin temp_queue = init_bunny_connection(temp_reply_queue_name) block.call(temp_queue) rescue Exception => e logger.log_connection_error_and_sleep(e) init_bunny_connection retry ensure @b.stop if @b rescue "" # we don't care about exceptions here end end def init_bunny_connection(temp_reply_queue_name=nil) @broker = @config[:broker] logger.info "Establishing connection to RabbitMQ broker #{@broker.inspect}." @b = Bunny.new(:host => (@broker[0] || "localhost"), :port => (@broker[1] || 5672), :timeout => @config[:timeout]) @b.start if temp_reply_queue_name reply_queue = @b.queue(temp_reply_queue_name) reply_queue.bind(@b.exchange(temp_reply_queue_name), :key => temp_reply_queue_name) reply_queue end rescue Exception => e logger.log_connection_error_and_sleep(e) retry end end