# BunnyConnectionDriver is the original implementation of # the AMQP-based broker. I am trying to make this work with # straight Ruby AMQP driver, but this one is the default # until I get that working properly. module BunnyConnectionDriver @@BunnyConnector_Connection = nil def connection_driver_initialize puts "BunnyConnectionDriver is loaded!" end def get_connection if @@BunnyConnector_Connection.nil? new_opts = { :vhost => ENV["BROKER_VHOST"] || "eventbus", :host => ENV["BROKER_HOST"] || "localhost", :port => ENV["BROKER_PORT"] || 5672, :user => ENV["BROKER_USER"] || "eventbus", :pass => ENV["BROKER_PASS"] || "eventbus", # :logging => opts.delete(:logging) || false, #:logfile => opts.delete(:logfile) || STDOUT } @@BunnyConnector_Connection = Bunny.new(new_opts) @@BunnyConnector_Connection.start end return @@BunnyConnector_Connection end def send_raw(content, opts = {}) queue_name = opts.delete(:queue_name) client = get_connection exchange = client.exchange('eventbus', :durable => true, :persistent => true, :immediate => false) puts "Declaring queue: #{queue_name}" q = client.queue(queue_name, :durable => true, :persistent => true, :immediate => false) q.bind('eventbus', :key => queue_name) puts "Publishing content to #{queue_name}: #{content}" exchange.publish(content, :key => queue_name) puts "Done!" end def watch_queue(listen_queue) b = get_connection @logger.info "Listening on #{listen_queue}" exchange = b.exchange('eventbus', :durable => true, :persistent => true, :immediate => false) q = b.queue(listen_queue, :durable => true, :persistent => true, :immediate => false) q.bind(exchange, :key => listen_queue) q.pop do |msg| if msg[:delivery_details].nil? # Gotta figure out why this is necessary. Blech. sleep 0.25 else yield msg[:payload] end end end end