Sha256: a20aec0759edb5ae8075e5d2350a3fb5307057d059c95c331648b19d0c55e302
Contents?: true
Size: 1.11 KB
Versions: 2
Compression:
Stored size: 1.11 KB
Contents
require 'date' require 'bunny' module Waffle module Transports class Rabbitmq < Base EXCHANGE = 'events' protected def publish_impl(flow = 'events', message = '') exchange.publish(encoder.encode(message), :key => flow) end def subscribe_impl(flow = 'events', &block) if flow.is_a?(Array) flow.each{|f| queue.bind(exchange, :key => f)} else queue.bind(exchange, :key => flow) end queue.subscribe do |message| block.call(message[:delivery_details][:routing_key], encoder.decode(message[:payload])) end end def connection_exceptions [Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET] end def exchange @exchange ||= @bunny.exchange(config.options['exchange'] || EXCHANGE) end def queue @queue ||= @bunny.queue(config.options['queue'] || '', :durable => true, :auto_delete => true) end def do_connect @exchange = nil @queue = nil @bunny = Bunny.new(config.url) @bunny.start end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
waffle-0.6.0 | lib/waffle/transports/rabbitmq.rb |
waffle-0.5.1 | lib/waffle/transports/rabbitmq.rb |