Sha256: 697160bcb455cc9c1b15cf079c1b415e81e97e28fa02cadefe2149007776aa74
Contents?: true
Size: 1.36 KB
Versions: 2
Compression:
Stored size: 1.36 KB
Contents
require 'date' require 'bunny' module Waffle module Transports class Rabbitmq CONNECTION_ATTEMPT_TIMEOUT = 30 EXCHANGE = 'events' @@last_connection_attempt = Time.now def initialize @bunny = Bunny.new Waffle::Config.url connect end def encoder @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}") end def publish(flow = 'events', message = '') begin @exchange = @bunny.exchange EXCHANGE @exchange.publish message, :key => flow rescue if (Time.now - @@last_connection_attempt) > CONNECTION_ATTEMPT_TIMEOUT connect end end end def subscribe(flow = 'events') @exchange = @bunny.exchange EXCHANGE @queue = @bunny.queue '', :durable => true, :auto_delete => true if flow.is_a? Array flow.each{ |f| @queue.bind @exchange, :key => f } else @queue.bind @exchange, :key => flow end @queue.subscribe do |message| yield message[:delivery_details][:routing_key], encoder.decode(message[:payload]) end end private def connect begin @@last_connection_attempt = Time.now @bunny.start rescue nil end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
waffle-0.3.5 | lib/waffle/transports/rabbitmq.rb |
waffle-0.3.4 | lib/waffle/transports/rabbitmq.rb |