lib/splash/transports/rabbitmq.rb in prometheus-splash-0.3.0 vs lib/splash/transports/rabbitmq.rb in prometheus-splash-0.4.0

- old
+ new

@@ -9,13 +9,19 @@ def_delegators :@queue, :subscribe def initialize(options = {}) @config = get_config.transports + @url = "amqp://" host = @config[:rabbitmq][:host] port = @config[:rabbitmq][:port] - @url = "amqp://#{host}:#{port}" + vhost = @config[:rabbitmq][:vhost] + if @config[:rabbitmq][:user] and @config[:rabbitmq][:passwd] then + creds = "#{@config[:rabbitmq][:user]}:#{@config[:rabbitmq][:passwd]}@" + @url << creds + end + @url << "#{host}:#{port}#{vhost}" begin @connection = Bunny.new url: @url @connection.start @channel = @connection.create_channel @queue = @channel.queue options[:queue] @@ -29,16 +35,23 @@ class Client include Splash::Config include Splash::Transports + include Splash::Loggers def initialize @config = get_config.transports + @url = "amqp://" host = @config[:rabbitmq][:host] port = @config[:rabbitmq][:port] - @url = "amqp://#{host}:#{port}" + vhost = @config[:rabbitmq][:vhost] + if @config[:rabbitmq][:user] and @config[:rabbitmq][:passwd] then + creds = "#{@config[:rabbitmq][:user]}:#{@config[:rabbitmq][:passwd]}@" + @url << creds + end + @url << "#{host}:#{port}#{vhost}" begin @connection = Bunny.new url: @url @connection.start @channel = @connection.create_channel rescue Bunny::Exception @@ -67,9 +80,10 @@ get_default_subscriber(queue: queue).subscribe do |delivery_info, properties, payload| res = YAML::load(payload) lock.synchronize { condition.signal } end + get_logger.send "Verb : #{order[:verb].to_s} to queue : #{order[:queue]}." get_default_client.publish queue: order[:queue], message: order.to_yaml lock.synchronize { condition.wait(lock) } return res end