lib/lolitra/handler_base.rb in lolitra-0.0.5 vs lib/lolitra/handler_base.rb in lolitra-0.1.0b

- old
+ new

@@ -1,14 +1,30 @@ require 'singleton' +require 'log4r' require 'amqp' require 'amqp/utilities/event_loop_helper' require 'json' +require 'fiber' module Lolitra + include Log4r + + @@logger = Logger.new 'lolitra' + @@logger.outputters = Outputter.stdout + + def self.logger + @@logger + end + + def self.logger=(new_logger) + @@logger = new_logger + end + module MessageHandler module Helpers - def self.underscore(word) + def self.underscore(arg) + word = arg.dup word.gsub!(/::/, '/') word.gsub!(/([A-Z]+)([A-Z][a-z])/,'\1_\2') word.gsub!(/([a-z\d])([A-Z])/,'\1_\2') word.tr!("-", "_") word.downcase! @@ -102,10 +118,14 @@ def self.included(base) base.send :extend, MessageHandlerClass end + def publish(message) + self.class.publish(message) + end + def handle(message) handler_method = self.class.handlers[message.class.message_key][1] raise "Can't handle message #{message.class}" unless handler_method self.send(handler_method, message) end @@ -163,11 +183,11 @@ def message_key(key = nil) if (key) self.class_message_key = key else - self.class_message_key || "#{MessageHandler::Helper.underscore(self.class.name)}" + self.class_message_key || "#{MessageHandler::Helpers.underscore(self.class.name)}" end end def unmarshall(message_json) hash = JSON.parse(message_json) @@ -179,20 +199,44 @@ def self.included(base) base.send :extend, MessageClass end def initialize(hash={}) - hash.each { |key, value| self.send("#{MessageHandler::Helper.underscore(key)}=", value) } + hash.each { |key, value| self.send("#{MessageHandler::Helpers.underscore(key)}=", value) } end - def marshall + def to_hash hash = {} - self.instance_variables.each {|var| hash[var.to_s.delete("@")] = self.instance_variable_get(var) } - JSON.generate(hash) + self.instance_variables.each {|var| hash[var.to_s.delete("@").to_sym] = self.instance_variable_get(var) } + hash end + + def marshall + JSON.generate(to_hash) + end end + class FayeBus + def initialize(options = {}) + EM::next_tick do + @socketClient = Faye::Client.new(options[:url] || 'http://localhost:9292/faye') + end + end + + def subscribe(message_class, handler_class) + EM::next_tick do + @socketClient.subscribe(message_class.message_key) do |payload| + handler_class.handle(message_class.unmarshall(payload)) + end + end + end + + def publish(message) + @socketClient.publish(message.class.message_key, message.marshall) + end + end + class AmqpBus attr_accessor :queue_prefix attr_accessor :connection attr_accessor :exchange @@ -215,19 +259,44 @@ def pull_subscribe(message_class, handler_class) create_queue(message_class, handler_class, {:durable => true}, queue_prefix + MessageHandler::Helpers.underscore(handler_class.name)) end def publish(message) + Lolitra::logger.debug("Message sent: #{message.class.message_key}") + Lolitra::logger.debug("#{message.marshall}") self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i) end private def create_queue(message_class, handler_class, options, queue_name) EM.next_tick do channel = AMQP::Channel.new(self.connection) - channel.prefetch(1).queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key).subscribe do |info, payload| - message_class_tmp = handler_class.handlers[info.routing_key][0] - handler_class.handle(message_class_tmp.unmarshall(payload)) + channel.prefetch(1).queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key).subscribe(:ack => true) do |info, payload| + Fiber.new do + current_fiber = Fiber.current + for i in (0..5) + begin + Lolitra::logger.debug("Message recived: #{info.routing_key}") + Lolitra::logger.debug("#{payload}") + message_class_tmp = handler_class.handlers[info.routing_key][0] + handler_class.handle(message_class_tmp.unmarshall(payload)) + info.ack + Lolitra::logger.debug("Message processed") + break + rescue => e + Lolitra::logger.error("Try #{i}: #{e.message}") + if (i!=5) + EventMachine.add_timer(5) do + current_fiber.resume + end + Fiber.yield + else + Lolitra::logger.error(e.backtrace.join("\n\t")) + info.reject(:requeue => false) + end + end + end + end.resume end end end end end