lib/waffle/transports/rabbitmq.rb in waffle-0.3.3 vs lib/waffle/transports/rabbitmq.rb in waffle-0.3.4
- old
+ new
@@ -1,39 +1,62 @@
+require 'date'
require 'bunny'
module Waffle
module Transports
class Rabbitmq
+ CONNECTION_ATTEMPT_TIMEOUT = 30
+
EXCHANGE = 'events'
+ @@last_connection_attempt = Time.now
+
def initialize
@bunny = Bunny.new Waffle::Config.url
- @bunny.start
+ connect
end
def encoder
@encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}")
end
def publish(flow = 'events', message = '')
- @exchange = @bunny.exchange EXCHANGE
- @exchange.publish message, :key => flow
+ begin
+ @exchange = @bunny.exchange EXCHANGE
+ @exchange.publish message, :key => flow
+ rescue
+ if (Time.now - @@last_connection_attempt) > CONNECTION_ATTEMPT_TIMEOUT
+ connect
+ end
+ end
end
def subscribe(flow = 'events')
@exchange = @bunny.exchange EXCHANGE
@queue = @bunny.queue '', :durable => true, :auto_delete => true
+
if flow.is_a? Array
flow.each{ |f| @queue.bind @exchange, :key => f }
else
@queue.bind @exchange, :key => flow
end
@queue.subscribe do |message|
yield message[:delivery_details][:routing_key], encoder.decode(message[:payload])
end
end
+
+ private
+
+ def connect
+ begin
+ @@last_connection_attempt = Time.now
+ @bunny.start
+ rescue
+ nil
+ end
+ end
end
end
end