lib/splash/transports/rabbitmq.rb in prometheus-splash-0.3.0 vs lib/splash/transports/rabbitmq.rb in prometheus-splash-0.4.0
- old
+ new
@@ -9,13 +9,19 @@
def_delegators :@queue, :subscribe
def initialize(options = {})
@config = get_config.transports
+ @url = "amqp://"
host = @config[:rabbitmq][:host]
port = @config[:rabbitmq][:port]
- @url = "amqp://#{host}:#{port}"
+ vhost = @config[:rabbitmq][:vhost]
+ if @config[:rabbitmq][:user] and @config[:rabbitmq][:passwd] then
+ creds = "#{@config[:rabbitmq][:user]}:#{@config[:rabbitmq][:passwd]}@"
+ @url << creds
+ end
+ @url << "#{host}:#{port}#{vhost}"
begin
@connection = Bunny.new url: @url
@connection.start
@channel = @connection.create_channel
@queue = @channel.queue options[:queue]
@@ -29,16 +35,23 @@
class Client
include Splash::Config
include Splash::Transports
+ include Splash::Loggers
def initialize
@config = get_config.transports
+ @url = "amqp://"
host = @config[:rabbitmq][:host]
port = @config[:rabbitmq][:port]
- @url = "amqp://#{host}:#{port}"
+ vhost = @config[:rabbitmq][:vhost]
+ if @config[:rabbitmq][:user] and @config[:rabbitmq][:passwd] then
+ creds = "#{@config[:rabbitmq][:user]}:#{@config[:rabbitmq][:passwd]}@"
+ @url << creds
+ end
+ @url << "#{host}:#{port}#{vhost}"
begin
@connection = Bunny.new url: @url
@connection.start
@channel = @connection.create_channel
rescue Bunny::Exception
@@ -67,9 +80,10 @@
get_default_subscriber(queue: queue).subscribe do |delivery_info, properties, payload|
res = YAML::load(payload)
lock.synchronize { condition.signal }
end
+ get_logger.send "Verb : #{order[:verb].to_s} to queue : #{order[:queue]}."
get_default_client.publish queue: order[:queue], message: order.to_yaml
lock.synchronize { condition.wait(lock) }
return res
end