Sha256: 4db02c4911529a6413486b779ee8eec6e5795ac76c661a02a3bfe1e3cf0b3a29

Contents?: true

Size: 1.41 KB

Versions: 1

Compression:

Stored size: 1.41 KB

Contents

# coding: utf-8
module Splash
  module Transports
    module Rabbitmq

      class Subscriber
        include Splash::Config
        extend Forwardable

        def_delegators :@queue, :subscribe

        def initialize(options = {})
          @config = get_config.transports
          @connection = Bunny.new url: @config[:rabbitmq][:url]
          @connection.start
          @channel = @connection.create_channel
          @queue    = @channel.queue options[:queue]

        end

      end


      class Client
        include Splash::Config
        def initialize
          @config = get_config.transports
          @connection = Bunny.new url: @config[:rabbitmq][:url]
          @connection.start
          @channel = @connection.create_channel
        end


        def publish(options ={})
          return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
        end

        def ack(ack)
          return @channel.acknowledge(ack, false)
        end


        def get(options ={})
          queue = @channel.queue(options[:queue])
          opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
          delivery_info, properties, payload = queue.pop
          res = {:message => payload}
          res[:ack] = delivery_info.delivery_tag if options[:manual_ack]
          return res
        end

        def close
          @connection.close
        end

      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
prometheus-splash-0.1.1 lib/splash/transports/rabbitmq.rb