lib/lolitra/handler_base.rb in lolitra-0.0.2 vs lib/lolitra/handler_base.rb in lolitra-0.0.3
- old
+ new
@@ -1,6 +1,9 @@
require 'singleton'
+require 'amqp'
+require 'amqp/utilities/event_loop_helper'
+require 'json'
module Lolitra
module MessageHandler
module Helpers
def self.underscore(word)
@@ -173,11 +176,13 @@
def initialize(hash={})
hash.each { |key, value| self.send("#{MessageHandler::Helper.underscore(key)}=", value) }
end
def marshall
- JSON.generate(self)
+ hash = {}
+ self.instance_variables.each {|var| hash[var.to_s.delete("@")] = self.instance_variable_get(var) }
+ JSON.generate(hash)
end
end
module AmqpMessageHandler
module AmqpMessageHandlerClass
@@ -205,27 +210,36 @@
end
class AmqpBus
attr_accessor :queue_prefix
+ attr_accessor :connection
attr_accessor :exchange
def initialize(hash = {})
- self.queue_prefix = hash[:queue_prefix]
- self.exchange = hash[:exchange]
+ params = hash.reject { |key, value| !value }
+ raise "no :exchange specified" unless hash[:exchange]
+
+ self.queue_prefix = hash[:queue_prefix]||""
+
+ AMQP::Utilities::EventLoopHelper.run do
+ self.connection = AMQP.connect(params)
+ self.exchange = AMQP::Channel.new(self.connection).topic(params[:exchange], :durable => true)
+ end
end
def subscribe(message_class, handler_class)
EM.next_tick do
- AmqpNotifier.new(exchange, queue_prefix + MessageHandler::Helper.underscore(handler_class.name).subscribe(message_class.message_key)) do |info, payload|
- #all message with the same handler goes here to try to maintain the order in the same queue (not ensured)
+ channel = AMQP::Channel.new(self.connection)
+ channel.prefetch(1).queue(queue_prefix + MessageHandler::Helpers.underscore(handler_class.name), :dureable => true).bind(self.exchange, :routing_key => message_class.message_key).subscribe do |info, payload|
message_class_tmp = handler_class.message_class_by_key[info.routing_key]
handler_class.handle(message_class_tmp.unmarshall(payload))
- end
+ end
end
end
def publish(message)
- AmqpNotifier.new(exchange).publish(message.class.message_key, message.marshall)
+ self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i)
end
+
end
end