lib/lolitra/handler_base.rb in lolitra-0.1.3 vs lib/lolitra/handler_base.rb in lolitra-0.2.0

- old
+ new

@@ -16,10 +16,19 @@ def self.logger=(new_logger) @@logger = new_logger end + def self.log_exception(e) + logger.error(e.message) + logger.error(e.backtrace.join("\n\t")) + end + + def self.publish(message) + Lolitra::MessageHandlerManager.publish(message) + end + module MessageHandler module Helpers def self.underscore(arg) word = arg.dup word.gsub!(/::/, '/') @@ -54,17 +63,14 @@ base.starters = [] base.is_stateful = false end def handle(message) - Lolitra::logger.debug("Message recived: #{message.class.message_key}") - Lolitra::logger.debug("#{message}") begin get_handler(message).handle(message) - rescue NoMethodError => e - raise NoHandlerMessageException.new(self, message) if e.message == "undefined method `handle' for nil:NilClass" - raise + rescue => e + Lolitra::log_exception(e) end end def publish(message) #TODO: IoC @@ -202,11 +208,13 @@ base.send :extend, MessageClass end def initialize(hash={}) super() - self.replace(hash) + hash.keys.each do |key| + self.send "#{key}=", hash[key] if self.respond_to? "#{key}=" + end end def marshall JSON.generate(self) end @@ -220,10 +228,12 @@ end def subscribe(message_class, handler_class) EM::next_tick do @socketClient.subscribe(message_class.message_key) do |payload| + Lolitra::logger.debug("Message recived:") + Lolitra::logger.debug("#{payload}") handler_class.handle(message_class.unmarshall(payload)) end end end @@ -236,40 +246,84 @@ attr_accessor :queue_prefix attr_accessor :connection attr_accessor :exchange def initialize(hash = {}) - params = hash.reject { |key, value| !value } + Lolitra::MessageHandlerManager.bus = self + + @channels = {} + @params = hash.reject { |key, value| !value } raise "no :exchange specified" unless hash[:exchange] self.queue_prefix = hash[:queue_prefix]||"" - AMQP::Utilities::EventLoopHelper.run do - self.connection = AMQP.connect(params) - self.exchange = AMQP::Channel.new(self.connection).topic(params[:exchange], :durable => true) + self.connection = AMQP.start(@params) do |connection| + channel = create_channel(connection) do |channel| + begin + self.exchange = channel.topic(@params[:exchange], :durable => true) + + @params[:pull_subscribers].each do |handler| + Lolitra::MessageHandlerManager.register_pull_subscriber(handler) + end + rescue => e + Lolitra::logger.debug("error") + Lolitra::log_exception(e) + end + end + end end end def subscribe(message_class, handler_class) create_queue(message_class, handler_class, {:exclusive => true, :durable => false}, "") end def pull_subscribe(message_class, handler_class) - create_queue(message_class, handler_class, {:durable => true}, queue_prefix + MessageHandler::Helpers.underscore(handler_class.name)) + create_queue(message_class, handler_class, {:durable => true}) end def publish(message) + #TODO: if exchange channel is closed doesn't log anything self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i) end private - def create_queue(message_class, handler_class, options, queue_name) - EM.next_tick do - channel = AMQP::Channel.new(self.connection) - channel.prefetch(1).queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key).subscribe do |info, payload| - message_class_tmp = handler_class.handlers[info.routing_key][0] - handler_class.handle(message_class_tmp.unmarshall(payload)) + def create_channel(connection, &block) + channel = AMQP::Channel.new(connection) do + channel.on_error do |channel, close| + Lolitra::logger.error("Channel error: #{channel}") + Lolitra::logger.error(close) end + block.call(channel) + end + channel + end + + def create_queue(message_class, handler_class, options) + begin + queue_name = queue_prefix + MessageHandler::Helpers.underscore(handler_class.name) + + create_channel(self.connection) do |channel| + channel.queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key) + channel.close + end + + if !@channels[queue_name] #Only one subscriber by queue_name + @channels[queue_name] = create_channel(self.connection) do |channel| + channel.prefetch(1).queue(queue_name, options).subscribe do |info, payload| + begin + Lolitra::logger.debug("Message recived: #{info.routing_key}") + Lolitra::logger.debug("#{payload}") + message_class_tmp = handler_class.handlers[info.routing_key][0] + handler_class.handle(message_class_tmp.unmarshall(payload)) + rescue => e + Lolitra::log_exception(e) + end + end + end + end + rescue => e + Lolitra::log_exception(e) end end end end