lib/lolitra/handler_base.rb in lolitra-0.2.0 vs lib/lolitra/handler_base.rb in lolitra-0.2.1

- old
+ new

@@ -1,10 +1,12 @@ require 'singleton' require 'log4r' require 'amqp' require 'amqp/utilities/event_loop_helper' require 'json' +require 'fileutils' +require_relative 'rabbitmq_bus' module Lolitra include Log4r @@logger = Logger.new 'lolitra' @@ -25,10 +27,36 @@ def self.publish(message) Lolitra::MessageHandlerManager.publish(message) end + def self.unsubscribe(handler_class, &block) + Lolitra::MessageHandlerManager.unsubscribe(handler_class, &block) + end + + def self.disconnect(&block) + Lolitra::MessageHandlerManager.disconnect(&block) + end + + def self.subscribers + Lolitra::MessageHandlerManager.instance.subscribers.collect do |subscriber| + subscriber.name + end + end + + def self.process_deadletters(subscriber) + Lolitra::MessageHandlerManager.instance.process_deadletters(subscriber) + end + + def self.remove_next_deadletter(subscriber) + Lolitra::MessageHandlerManager.instance.remove_next_deadletter(subscriber) + end + + def self.purge_deadletters(subscriber) + Lolitra::MessageHandlerManager.instance.purge_deadletters(subscriber) + end + module MessageHandler module Helpers def self.underscore(arg) word = arg.dup word.gsub!(/::/, '/') @@ -63,15 +91,11 @@ base.starters = [] base.is_stateful = false end def handle(message) - begin - get_handler(message).handle(message) - rescue => e - Lolitra::log_exception(e) - end + get_handler(message).handle(message) end def publish(message) #TODO: IoC MessageHandlerManager.publish(message) @@ -140,10 +164,11 @@ class MessageHandlerManager include Singleton attr_accessor :bus + attr_accessor :subscribers def self.bus=(new_bus) instance.bus = new_bus end @@ -153,24 +178,40 @@ def self.register_subscriber(handler_class) instance.register_subscriber(handler_class) end + def subscribers + @subscribers ||= [] + end + + def process_deadletters(handler_class) + bus.process_deadletters(handler_class) + end + + def purge_deadletters(handler_class) + bus.purge_deadletters(handler_class) + end + + def remove_next_deadletter(handler_class) + bus.remove_next_deadletter(handler_class) + end + def register_subscriber(handler_class) + subscribers << handler_class handler_class.handle_messages.each do |message_class| bus.subscribe(message_class, handler_class) end end - def self.register_pull_subscriber(handler_class) - instance.register_pull_subscriber(handler_class) + def self.unsubscribe(handler_class, &block) + instance.unsubscribe(handler_class, &block) end - - def register_pull_subscriber(handler_class) - handler_class.handle_messages.each do |message_class| - bus.pull_subscribe(message_class, handler_class) - end + + def unsubscribe(handler_class, &block) + Lolitra::logger.info("Unsubscribing #{handler_class}") + bus.unsubscribe(handler_class, &block) end def self.publish(message) instance.publish(message) end @@ -178,10 +219,18 @@ def publish(message) Lolitra::logger.debug("Message sent: #{message.class.message_key}") Lolitra::logger.debug("#{message.marshall}") bus.publish(message) end + + def self.disconnect(&block) + instance.disconnect(&block) + end + + def disconnect(&block) + bus.disconnect(&block) + end end module Message module MessageClass @@ -238,92 +287,7 @@ end def publish(message) @socketClient.publish(message.class.message_key, message.marshall) end - end - - class AmqpBus - attr_accessor :queue_prefix - attr_accessor :connection - attr_accessor :exchange - - def initialize(hash = {}) - Lolitra::MessageHandlerManager.bus = self - - @channels = {} - @params = hash.reject { |key, value| !value } - raise "no :exchange specified" unless hash[:exchange] - - self.queue_prefix = hash[:queue_prefix]||"" - AMQP::Utilities::EventLoopHelper.run do - self.connection = AMQP.start(@params) do |connection| - channel = create_channel(connection) do |channel| - begin - self.exchange = channel.topic(@params[:exchange], :durable => true) - - @params[:pull_subscribers].each do |handler| - Lolitra::MessageHandlerManager.register_pull_subscriber(handler) - end - rescue => e - Lolitra::logger.debug("error") - Lolitra::log_exception(e) - end - end - end - end - end - - def subscribe(message_class, handler_class) - create_queue(message_class, handler_class, {:exclusive => true, :durable => false}, "") - end - - def pull_subscribe(message_class, handler_class) - create_queue(message_class, handler_class, {:durable => true}) - end - - def publish(message) - #TODO: if exchange channel is closed doesn't log anything - self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i) - end - - private - def create_channel(connection, &block) - channel = AMQP::Channel.new(connection) do - channel.on_error do |channel, close| - Lolitra::logger.error("Channel error: #{channel}") - Lolitra::logger.error(close) - end - block.call(channel) - end - channel - end - - def create_queue(message_class, handler_class, options) - begin - queue_name = queue_prefix + MessageHandler::Helpers.underscore(handler_class.name) - - create_channel(self.connection) do |channel| - channel.queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key) - channel.close - end - - if !@channels[queue_name] #Only one subscriber by queue_name - @channels[queue_name] = create_channel(self.connection) do |channel| - channel.prefetch(1).queue(queue_name, options).subscribe do |info, payload| - begin - Lolitra::logger.debug("Message recived: #{info.routing_key}") - Lolitra::logger.debug("#{payload}") - message_class_tmp = handler_class.handlers[info.routing_key][0] - handler_class.handle(message_class_tmp.unmarshall(payload)) - rescue => e - Lolitra::log_exception(e) - end - end - end - end - rescue => e - Lolitra::log_exception(e) - end - end end end