lib/waffle/transports/rabbitmq.rb in waffle-0.3.5 vs lib/waffle/transports/rabbitmq.rb in waffle-0.4.0
- old
+ new
@@ -1,62 +1,46 @@
require 'date'
require 'bunny'
module Waffle
module Transports
- class Rabbitmq
-
- CONNECTION_ATTEMPT_TIMEOUT = 30
-
+ class Rabbitmq < Base
EXCHANGE = 'events'
- @@last_connection_attempt = Time.now
-
- def initialize
- @bunny = Bunny.new Waffle::Config.url
- connect
- end
-
- def encoder
- @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}")
- end
-
def publish(flow = 'events', message = '')
- begin
- @exchange = @bunny.exchange EXCHANGE
- @exchange.publish message, :key => flow
- rescue
- if (Time.now - @@last_connection_attempt) > CONNECTION_ATTEMPT_TIMEOUT
- connect
- end
- end
+ exchange.publish(Waffle.encoder.encode(message), :key => flow)
end
def subscribe(flow = 'events')
- @exchange = @bunny.exchange EXCHANGE
- @queue = @bunny.queue '', :durable => true, :auto_delete => true
-
- if flow.is_a? Array
- flow.each{ |f| @queue.bind @exchange, :key => f }
+ if flow.is_a?(Array)
+ flow.each{|f| queue.bind(exchange, :key => f)}
else
- @queue.bind @exchange, :key => flow
+ queue.bind(exchange, :key => flow)
end
- @queue.subscribe do |message|
- yield message[:delivery_details][:routing_key], encoder.decode(message[:payload])
+ queue.subscribe do |message|
+ yield(message[:delivery_details][:routing_key], Waffle.encoder.decode(message[:payload]))
end
end
+ def connection_exceptions
+ [Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET]
+ end
+
private
+ def exchange
+ @exchange ||= @bunny.exchange(EXCHANGE)
+ end
- def connect
- begin
- @@last_connection_attempt = Time.now
- @bunny.start
- rescue
- nil
- end
- end
+ def queue
+ @queue ||= @bunny.queue('', :durable => true, :auto_delete => true)
+ end
+ def do_connect
+ @exchange = nil
+ @queue = nil
+ @bunny = Bunny.new(Waffle.config.url)
+ @bunny.start
+ end
end
end
end