lib/splash/transports/rabbitmq.rb in prometheus-splash-0.1.1 vs lib/splash/transports/rabbitmq.rb in prometheus-splash-0.2.0
- old
+ new
@@ -9,37 +9,70 @@
def_delegators :@queue, :subscribe
def initialize(options = {})
@config = get_config.transports
- @connection = Bunny.new url: @config[:rabbitmq][:url]
- @connection.start
- @channel = @connection.create_channel
- @queue = @channel.queue options[:queue]
-
+ host = @config[:rabbitmq][:host]
+ port = @config[:rabbitmq][:port]
+ @url = "amqp://#{host}:#{port}"
+ begin
+ @connection = Bunny.new url: @url
+ @connection.start
+ @channel = @connection.create_channel
+ @queue = @channel.queue options[:queue]
+ rescue Bunny::Exception
+ return { :case => :service_dependence_missing, :more => "RabbitMQ Transport not available." }
+ end
end
+
end
class Client
include Splash::Config
+ include Splash::Transports
+
def initialize
@config = get_config.transports
- @connection = Bunny.new url: @config[:rabbitmq][:url]
- @connection.start
- @channel = @connection.create_channel
+ host = @config[:rabbitmq][:host]
+ port = @config[:rabbitmq][:port]
+ @url = "amqp://#{host}:#{port}"
+ begin
+ @connection = Bunny.new url: @url
+ @connection.start
+ @channel = @connection.create_channel
+ rescue Bunny::Exception
+ return { :case => :service_dependence_missing, :more => "RabbitMQ Transport not available." }
+ end
end
+ def purge(options)
+ @channel.queue(options[:queue]).purge
+ end
+
def publish(options ={})
return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
end
def ack(ack)
return @channel.acknowledge(ack, false)
end
+ def execute(order)
+ queue = order[:return_to]
+ lock = Mutex.new
+ res = nil
+ condition = ConditionVariable.new
+ get_default_subscriber(queue: queue).subscribe(timeout: 5) do |delivery_info, properties, payload|
+ res = YAML::load(payload)
+ lock.synchronize { condition.signal }
+ end
+ get_default_client.publish queue: order[:queue], message: order.to_yaml
+ lock.synchronize { condition.wait(lock) }
+ return res
+ end
def get(options ={})
queue = @channel.queue(options[:queue])
opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
delivery_info, properties, payload = queue.pop