lib/lolitra/handler_base.rb in lolitra-0.0.2 vs lib/lolitra/handler_base.rb in lolitra-0.0.3

- old
+ new

@@ -1,6 +1,9 @@ require 'singleton' +require 'amqp' +require 'amqp/utilities/event_loop_helper' +require 'json' module Lolitra module MessageHandler module Helpers def self.underscore(word) @@ -173,11 +176,13 @@ def initialize(hash={}) hash.each { |key, value| self.send("#{MessageHandler::Helper.underscore(key)}=", value) } end def marshall - JSON.generate(self) + hash = {} + self.instance_variables.each {|var| hash[var.to_s.delete("@")] = self.instance_variable_get(var) } + JSON.generate(hash) end end module AmqpMessageHandler module AmqpMessageHandlerClass @@ -205,27 +210,36 @@ end class AmqpBus attr_accessor :queue_prefix + attr_accessor :connection attr_accessor :exchange def initialize(hash = {}) - self.queue_prefix = hash[:queue_prefix] - self.exchange = hash[:exchange] + params = hash.reject { |key, value| !value } + raise "no :exchange specified" unless hash[:exchange] + + self.queue_prefix = hash[:queue_prefix]||"" + + AMQP::Utilities::EventLoopHelper.run do + self.connection = AMQP.connect(params) + self.exchange = AMQP::Channel.new(self.connection).topic(params[:exchange], :durable => true) + end end def subscribe(message_class, handler_class) EM.next_tick do - AmqpNotifier.new(exchange, queue_prefix + MessageHandler::Helper.underscore(handler_class.name).subscribe(message_class.message_key)) do |info, payload| - #all message with the same handler goes here to try to maintain the order in the same queue (not ensured) + channel = AMQP::Channel.new(self.connection) + channel.prefetch(1).queue(queue_prefix + MessageHandler::Helpers.underscore(handler_class.name), :dureable => true).bind(self.exchange, :routing_key => message_class.message_key).subscribe do |info, payload| message_class_tmp = handler_class.message_class_by_key[info.routing_key] handler_class.handle(message_class_tmp.unmarshall(payload)) - end + end end end def publish(message) - AmqpNotifier.new(exchange).publish(message.class.message_key, message.marshall) + self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i) end + end end