Sha256: 0ead021bee62062f84a8cfc36d9fc63ef37a4c90f6939458330cfffae0c462d0

Contents?: true

Size: 910 Bytes

Versions: 1

Compression:

Stored size: 910 Bytes

Contents

require 'bunny'

module Waffle
  module Transports
    class Rabbitmq

      EXCHANGE = 'events'

      def initialize
        @bunny = Bunny.new Waffle::Config.url
        @bunny.start
      end

      def encoder
        @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}")
      end

      def publish(flow = 'events', message = '')
        @exchange = @bunny.exchange EXCHANGE
        @exchange.publish message, :key => flow
      end

      def subscribe(flow = 'events')
        @exchange = @bunny.exchange EXCHANGE
        @queue    = @bunny.queue '', :durable => true, :auto_delete => true
        if flow.is_a? Array
          flow.each{ |f| @queue.bind @exchange, :key => f }
        else
          @queue.bind @exchange, :key => flow
        end

        @queue.subscribe do |message|
          yield encoder.decode(message[:payload])
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
waffle-0.3.1 lib/waffle/transports/rabbitmq.rb