Sha256: 25f3c6cb6680bc40a863f50519c6502215e87c3f0c470bdc3c6094ce342ecdbb

Contents?: true

Size: 933 Bytes

Versions: 1

Compression:

Stored size: 933 Bytes

Contents

require 'bunny'

module Waffle
  module Transports
    class Rabbitmq

      EXCHANGE = 'events'

      def initialize
        @bunny = Bunny.new Waffle::Config.url
        @bunny.start
      end

      def encoder
        @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}")
      end

      def publish(flow = 'events', message = '')
        @exchange = @bunny.exchange EXCHANGE
        @exchange.publish message, :key => flow
      end

      def subscribe(flow = 'events')
        @exchange = @bunny.exchange EXCHANGE
        @queue    = @bunny.queue '', :durable => true, :auto_delete => true
        if flow.is_a? Array
          flow.each{ |f| @queue.bind @exchange, :key => f }
        else
          @queue.bind @exchange, :key => flow
        end

        @queue.subscribe do |message|
          yield message[:routing_key], encoder.decode(message[:payload])
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
waffle-0.3.2 lib/waffle/transports/rabbitmq.rb