lib/lolitra/handler_base.rb in lolitra-0.1.3 vs lib/lolitra/handler_base.rb in lolitra-0.2.0
- old
+ new
@@ -16,10 +16,19 @@
def self.logger=(new_logger)
@@logger = new_logger
end
+ def self.log_exception(e)
+ logger.error(e.message)
+ logger.error(e.backtrace.join("\n\t"))
+ end
+
+ def self.publish(message)
+ Lolitra::MessageHandlerManager.publish(message)
+ end
+
module MessageHandler
module Helpers
def self.underscore(arg)
word = arg.dup
word.gsub!(/::/, '/')
@@ -54,17 +63,14 @@
base.starters = []
base.is_stateful = false
end
def handle(message)
- Lolitra::logger.debug("Message recived: #{message.class.message_key}")
- Lolitra::logger.debug("#{message}")
begin
get_handler(message).handle(message)
- rescue NoMethodError => e
- raise NoHandlerMessageException.new(self, message) if e.message == "undefined method `handle' for nil:NilClass"
- raise
+ rescue => e
+ Lolitra::log_exception(e)
end
end
def publish(message)
#TODO: IoC
@@ -202,11 +208,13 @@
base.send :extend, MessageClass
end
def initialize(hash={})
super()
- self.replace(hash)
+ hash.keys.each do |key|
+ self.send "#{key}=", hash[key] if self.respond_to? "#{key}="
+ end
end
def marshall
JSON.generate(self)
end
@@ -220,10 +228,12 @@
end
def subscribe(message_class, handler_class)
EM::next_tick do
@socketClient.subscribe(message_class.message_key) do |payload|
+ Lolitra::logger.debug("Message recived:")
+ Lolitra::logger.debug("#{payload}")
handler_class.handle(message_class.unmarshall(payload))
end
end
end
@@ -236,40 +246,84 @@
attr_accessor :queue_prefix
attr_accessor :connection
attr_accessor :exchange
def initialize(hash = {})
- params = hash.reject { |key, value| !value }
+ Lolitra::MessageHandlerManager.bus = self
+
+ @channels = {}
+ @params = hash.reject { |key, value| !value }
raise "no :exchange specified" unless hash[:exchange]
self.queue_prefix = hash[:queue_prefix]||""
-
AMQP::Utilities::EventLoopHelper.run do
- self.connection = AMQP.connect(params)
- self.exchange = AMQP::Channel.new(self.connection).topic(params[:exchange], :durable => true)
+ self.connection = AMQP.start(@params) do |connection|
+ channel = create_channel(connection) do |channel|
+ begin
+ self.exchange = channel.topic(@params[:exchange], :durable => true)
+
+ @params[:pull_subscribers].each do |handler|
+ Lolitra::MessageHandlerManager.register_pull_subscriber(handler)
+ end
+ rescue => e
+ Lolitra::logger.debug("error")
+ Lolitra::log_exception(e)
+ end
+ end
+ end
end
end
def subscribe(message_class, handler_class)
create_queue(message_class, handler_class, {:exclusive => true, :durable => false}, "")
end
def pull_subscribe(message_class, handler_class)
- create_queue(message_class, handler_class, {:durable => true}, queue_prefix + MessageHandler::Helpers.underscore(handler_class.name))
+ create_queue(message_class, handler_class, {:durable => true})
end
def publish(message)
+ #TODO: if exchange channel is closed doesn't log anything
self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i)
end
private
- def create_queue(message_class, handler_class, options, queue_name)
- EM.next_tick do
- channel = AMQP::Channel.new(self.connection)
- channel.prefetch(1).queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key).subscribe do |info, payload|
- message_class_tmp = handler_class.handlers[info.routing_key][0]
- handler_class.handle(message_class_tmp.unmarshall(payload))
+ def create_channel(connection, &block)
+ channel = AMQP::Channel.new(connection) do
+ channel.on_error do |channel, close|
+ Lolitra::logger.error("Channel error: #{channel}")
+ Lolitra::logger.error(close)
end
+ block.call(channel)
+ end
+ channel
+ end
+
+ def create_queue(message_class, handler_class, options)
+ begin
+ queue_name = queue_prefix + MessageHandler::Helpers.underscore(handler_class.name)
+
+ create_channel(self.connection) do |channel|
+ channel.queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key)
+ channel.close
+ end
+
+ if !@channels[queue_name] #Only one subscriber by queue_name
+ @channels[queue_name] = create_channel(self.connection) do |channel|
+ channel.prefetch(1).queue(queue_name, options).subscribe do |info, payload|
+ begin
+ Lolitra::logger.debug("Message recived: #{info.routing_key}")
+ Lolitra::logger.debug("#{payload}")
+ message_class_tmp = handler_class.handlers[info.routing_key][0]
+ handler_class.handle(message_class_tmp.unmarshall(payload))
+ rescue => e
+ Lolitra::log_exception(e)
+ end
+ end
+ end
+ end
+ rescue => e
+ Lolitra::log_exception(e)
end
end
end
end