lib/splash/transports/rabbitmq.rb in prometheus-splash-0.0.3 vs lib/splash/transports/rabbitmq.rb in prometheus-splash-0.1.0

- old
+ new

@@ -1,38 +1,55 @@ require 'bunny' +require 'yaml' +require 'forwardable' + module Splash module Transports - module RabbitMQ + module Rabbitmq class Subscriber - def initialize + include Splash::Config + extend Forwardable + def_delegators :@queue, :subscribe + + def initialize(options = {}) + @config = get_config.transports + @connection = Bunny.new url: @config[:rabbitmq][:url] + @connection.start + @channel = @connection.create_channel + @queue = @channel.queue options[:queue] + end + end class Client + include Splash::Config def initialize - @connection = Bunny.new + @config = get_config.transports + @connection = Bunny.new url: @config[:rabbitmq][:url] @connection.start @channel = @connection.create_channel end def publish(options ={}) return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue]) end - def ack - return @channel.acknowledge(delivery_info.delivery_tag, false) + def ack(ack) + return @channel.acknowledge(ack, false) end def get(options ={}) queue = @channel.queue(options[:queue]) + opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false delivery_info, properties, payload = queue.pop res = {:message => payload} - res[:ack] = delivery_info.delivery_tag if options[:noack] + res[:ack] = delivery_info.delivery_tag if options[:manual_ack] return res end def close @connection.close