Sha256: 888cf7160607fcac02cf3db35faf8cfc845931e7fbda1b1fff9eda6c0d5bf40d
Contents?: true
Size: 1.83 KB
Versions: 1
Compression:
Stored size: 1.83 KB
Contents
require 'rubygems' require 'eventmachine' require 'amqp' module Factor module Runtime class MessageBus attr_accessor :host, :vhost, :username, :token, :connection, :channel, :exchange, :queue def initialize(email,token) @host = "queue.factor.io" @vhost = email @username=email @token=token end # Creates the connection and creates a topic exchange # An exchange references a place to send messages to # the exchange routes it to the queues based on the route_key def start(topic="factor",&code) EventMachine.run do #connection_settings={:host=>@host,:user=>@username,:password=>@token,:vhost=>@vhost} connection_settings={:host=>@host} @connection = AMQP.connect(connection_settings) @channel = AMQP::Channel.new(connection) @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange code.call end end # creates a new queue to listen to the topic exchange def listen(routing_key="#",&code) queue_name=SecureRandom.hex @queue = @channel.queue(queue_name) @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange @queue.subscribe do |headers,payload| # code.call(headers.routing_key,payload) puts "[Received Message (#{headers.routing_key})] #{payload.inspect}" code.call(payload) end end def send(message,close=false) puts "[Sending Message (#{message.route})] #{message.payload.inspect}" @exchange.publish(message.payload,:routing_key => message.route) EM.add_timer(1, Proc.new { close}) if close end def close @connection.close{ EventMachine.stop } end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
factor-0.1.10 | lib/runtime/message_bus.rb |