lib/waffle/transports/rabbitmq.rb in waffle-0.3.5 vs lib/waffle/transports/rabbitmq.rb in waffle-0.4.0

- old
+ new

@@ -1,62 +1,46 @@ require 'date' require 'bunny' module Waffle module Transports - class Rabbitmq - - CONNECTION_ATTEMPT_TIMEOUT = 30 - + class Rabbitmq < Base EXCHANGE = 'events' - @@last_connection_attempt = Time.now - - def initialize - @bunny = Bunny.new Waffle::Config.url - connect - end - - def encoder - @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}") - end - def publish(flow = 'events', message = '') - begin - @exchange = @bunny.exchange EXCHANGE - @exchange.publish message, :key => flow - rescue - if (Time.now - @@last_connection_attempt) > CONNECTION_ATTEMPT_TIMEOUT - connect - end - end + exchange.publish(Waffle.encoder.encode(message), :key => flow) end def subscribe(flow = 'events') - @exchange = @bunny.exchange EXCHANGE - @queue = @bunny.queue '', :durable => true, :auto_delete => true - - if flow.is_a? Array - flow.each{ |f| @queue.bind @exchange, :key => f } + if flow.is_a?(Array) + flow.each{|f| queue.bind(exchange, :key => f)} else - @queue.bind @exchange, :key => flow + queue.bind(exchange, :key => flow) end - @queue.subscribe do |message| - yield message[:delivery_details][:routing_key], encoder.decode(message[:payload]) + queue.subscribe do |message| + yield(message[:delivery_details][:routing_key], Waffle.encoder.decode(message[:payload])) end end + def connection_exceptions + [Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET] + end + private + def exchange + @exchange ||= @bunny.exchange(EXCHANGE) + end - def connect - begin - @@last_connection_attempt = Time.now - @bunny.start - rescue - nil - end - end + def queue + @queue ||= @bunny.queue('', :durable => true, :auto_delete => true) + end + def do_connect + @exchange = nil + @queue = nil + @bunny = Bunny.new(Waffle.config.url) + @bunny.start + end end end end