Sha256: c1aeede0cc9de996155ffa2320c2e6b518e82860f95c4e7d40a3f1a2838292e6
Contents?: true
Size: 1.51 KB
Versions: 2
Compression:
Stored size: 1.51 KB
Contents
require 'rubygems' require 'SecureRandom' module Factor module Runtime class MessageBus attr_accessor :host, :connection, :channel, :exchange, :queue def initialize(host="queue.factor.io") @host = host end # Creates the connection and creates a topic exchange # An exchange references a place to send messages to # the exchange routes it to the queues based on the route_key def start(topic="workflow",&code) EventMachine.run do @connection = AMQP.connect(:host=>@host) @channel = AMQP::Channel.new(connection) @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange code.call end end # creates a new queue to listen to the topic exchange def listen(routing_key="#",&code) queue_name=SecureRandom.hex @queue = @channel.queue(queue_name) @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange @queue.subscribe do |headers,payload| message = Message.new message.from_queue headers.routing_key, payload code.call(message) end end def send(message) @exchange.publish(message.payload,:routing_key => message.route) end def send_and_close(message) send(message) EM.add_timer(1, Proc.new { close}) end def close @connection.close{ EventMachine.stop } end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
factor-0.0.5 | lib/runtime/message_bus.rb |
factor-0.0.4 | lib/runtime/message_bus.rb |