lib/waffle/transports/rabbitmq.rb in waffle-0.4.0 vs lib/waffle/transports/rabbitmq.rb in waffle-0.5.0

- old
+ new

@@ -4,42 +4,42 @@ module Waffle module Transports class Rabbitmq < Base EXCHANGE = 'events' - def publish(flow = 'events', message = '') - exchange.publish(Waffle.encoder.encode(message), :key => flow) + protected + def publish_impl(flow = 'events', message = '') + exchange.publish(encoder.encode(message), :key => flow) end - def subscribe(flow = 'events') + def subscribe_impl(flow = 'events') if flow.is_a?(Array) flow.each{|f| queue.bind(exchange, :key => f)} else queue.bind(exchange, :key => flow) end queue.subscribe do |message| - yield(message[:delivery_details][:routing_key], Waffle.encoder.decode(message[:payload])) + yield(message[:delivery_details][:routing_key], encoder.decode(message[:payload])) end end def connection_exceptions [Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET] end - private def exchange - @exchange ||= @bunny.exchange(EXCHANGE) + @exchange ||= @bunny.exchange(config.options['exchange'] || EXCHANGE) end def queue - @queue ||= @bunny.queue('', :durable => true, :auto_delete => true) + @queue ||= @bunny.queue(config.options['queue'] || '', :durable => true, :auto_delete => true) end def do_connect @exchange = nil @queue = nil - @bunny = Bunny.new(Waffle.config.url) + @bunny = Bunny.new(config.url) @bunny.start end end end end