require 'amqp' module AMQPConnectionDriver def connection_driver_initialize puts "AMQPConnectionDriver is loaded!" end def get_connection new_opts = { :vhost => ENV["BROKER_VHOST"] || "eventbus", :host => ENV["BROKER_HOST"] || "localhost", :port => ENV["BROKER_PORT"] || 5672, :user => ENV["BROKER_USER"] || "eventbus", :pass => ENV["BROKER_PASS"] || "eventbus", # :logging => opts.delete(:logging) || false, #:logfile => opts.delete(:logfile) || STDOUT } return AMQP.connect(new_opts) end def send_raw(content, opts = {}) queue_name = opts.delete(:queue_name) EventMachine.run do client = get_connection channel = AMQP::Channel.new(client) exchange = channel.direct('eventbus', :durable => true, :persistent => true, :immediate => false, :auto_delete => false) puts "Declaring queue: #{queue_name}" q = channel.queue(queue_name, :durable => true, :persistent => true, :immediate => false, :auto_delete => false) q.bind(exchange, :routing_key => queue_name) puts "Sending message..." exchange.publish(content, :routing_key => queue_name) do puts "Published to #{queue_name}: #{content}" client.close { EventMachine.stop } end end end def watch_queue(listen_queue) EventMachine.run do client = get_connection channel = AMQP::Channel.new(client) @logger.info "Listening on #{listen_queue}" exchange = channel.direct('eventbus', :durable => true, :persistent => true, :immediate => false, :auto_delete => false) q = channel.queue(listen_queue, :durable => true, :persistent => true, :immediate => false, :auto_delete => false) q.bind(exchange, :routing_key => listen_queue) q.subscribe do |headers, payload| yield payload end end end end