lib/waffle/transports/rabbitmq.rb in waffle-0.4.0 vs lib/waffle/transports/rabbitmq.rb in waffle-0.5.0
- old
+ new
@@ -4,42 +4,42 @@
module Waffle
module Transports
class Rabbitmq < Base
EXCHANGE = 'events'
- def publish(flow = 'events', message = '')
- exchange.publish(Waffle.encoder.encode(message), :key => flow)
+ protected
+ def publish_impl(flow = 'events', message = '')
+ exchange.publish(encoder.encode(message), :key => flow)
end
- def subscribe(flow = 'events')
+ def subscribe_impl(flow = 'events')
if flow.is_a?(Array)
flow.each{|f| queue.bind(exchange, :key => f)}
else
queue.bind(exchange, :key => flow)
end
queue.subscribe do |message|
- yield(message[:delivery_details][:routing_key], Waffle.encoder.decode(message[:payload]))
+ yield(message[:delivery_details][:routing_key], encoder.decode(message[:payload]))
end
end
def connection_exceptions
[Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET]
end
- private
def exchange
- @exchange ||= @bunny.exchange(EXCHANGE)
+ @exchange ||= @bunny.exchange(config.options['exchange'] || EXCHANGE)
end
def queue
- @queue ||= @bunny.queue('', :durable => true, :auto_delete => true)
+ @queue ||= @bunny.queue(config.options['queue'] || '', :durable => true, :auto_delete => true)
end
def do_connect
@exchange = nil
@queue = nil
- @bunny = Bunny.new(Waffle.config.url)
+ @bunny = Bunny.new(config.url)
@bunny.start
end
end
end
end