lib/splash/transports/rabbitmq.rb in prometheus-splash-0.0.3 vs lib/splash/transports/rabbitmq.rb in prometheus-splash-0.1.0
- old
+ new
@@ -1,38 +1,55 @@
require 'bunny'
+require 'yaml'
+require 'forwardable'
+
module Splash
module Transports
- module RabbitMQ
+ module Rabbitmq
class Subscriber
- def initialize
+ include Splash::Config
+ extend Forwardable
+ def_delegators :@queue, :subscribe
+
+ def initialize(options = {})
+ @config = get_config.transports
+ @connection = Bunny.new url: @config[:rabbitmq][:url]
+ @connection.start
+ @channel = @connection.create_channel
+ @queue = @channel.queue options[:queue]
+
end
+
end
class Client
+ include Splash::Config
def initialize
- @connection = Bunny.new
+ @config = get_config.transports
+ @connection = Bunny.new url: @config[:rabbitmq][:url]
@connection.start
@channel = @connection.create_channel
end
def publish(options ={})
return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
end
- def ack
- return @channel.acknowledge(delivery_info.delivery_tag, false)
+ def ack(ack)
+ return @channel.acknowledge(ack, false)
end
def get(options ={})
queue = @channel.queue(options[:queue])
+ opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
delivery_info, properties, payload = queue.pop
res = {:message => payload}
- res[:ack] = delivery_info.delivery_tag if options[:noack]
+ res[:ack] = delivery_info.delivery_tag if options[:manual_ack]
return res
end
def close
@connection.close