lib/lolitra/handler_base.rb in lolitra-0.2.0 vs lib/lolitra/handler_base.rb in lolitra-0.2.1
- old
+ new
@@ -1,10 +1,12 @@
require 'singleton'
require 'log4r'
require 'amqp'
require 'amqp/utilities/event_loop_helper'
require 'json'
+require 'fileutils'
+require_relative 'rabbitmq_bus'
module Lolitra
include Log4r
@@logger = Logger.new 'lolitra'
@@ -25,10 +27,36 @@
def self.publish(message)
Lolitra::MessageHandlerManager.publish(message)
end
+ def self.unsubscribe(handler_class, &block)
+ Lolitra::MessageHandlerManager.unsubscribe(handler_class, &block)
+ end
+
+ def self.disconnect(&block)
+ Lolitra::MessageHandlerManager.disconnect(&block)
+ end
+
+ def self.subscribers
+ Lolitra::MessageHandlerManager.instance.subscribers.collect do |subscriber|
+ subscriber.name
+ end
+ end
+
+ def self.process_deadletters(subscriber)
+ Lolitra::MessageHandlerManager.instance.process_deadletters(subscriber)
+ end
+
+ def self.remove_next_deadletter(subscriber)
+ Lolitra::MessageHandlerManager.instance.remove_next_deadletter(subscriber)
+ end
+
+ def self.purge_deadletters(subscriber)
+ Lolitra::MessageHandlerManager.instance.purge_deadletters(subscriber)
+ end
+
module MessageHandler
module Helpers
def self.underscore(arg)
word = arg.dup
word.gsub!(/::/, '/')
@@ -63,15 +91,11 @@
base.starters = []
base.is_stateful = false
end
def handle(message)
- begin
- get_handler(message).handle(message)
- rescue => e
- Lolitra::log_exception(e)
- end
+ get_handler(message).handle(message)
end
def publish(message)
#TODO: IoC
MessageHandlerManager.publish(message)
@@ -140,10 +164,11 @@
class MessageHandlerManager
include Singleton
attr_accessor :bus
+ attr_accessor :subscribers
def self.bus=(new_bus)
instance.bus = new_bus
end
@@ -153,24 +178,40 @@
def self.register_subscriber(handler_class)
instance.register_subscriber(handler_class)
end
+ def subscribers
+ @subscribers ||= []
+ end
+
+ def process_deadletters(handler_class)
+ bus.process_deadletters(handler_class)
+ end
+
+ def purge_deadletters(handler_class)
+ bus.purge_deadletters(handler_class)
+ end
+
+ def remove_next_deadletter(handler_class)
+ bus.remove_next_deadletter(handler_class)
+ end
+
def register_subscriber(handler_class)
+ subscribers << handler_class
handler_class.handle_messages.each do |message_class|
bus.subscribe(message_class, handler_class)
end
end
- def self.register_pull_subscriber(handler_class)
- instance.register_pull_subscriber(handler_class)
+ def self.unsubscribe(handler_class, &block)
+ instance.unsubscribe(handler_class, &block)
end
-
- def register_pull_subscriber(handler_class)
- handler_class.handle_messages.each do |message_class|
- bus.pull_subscribe(message_class, handler_class)
- end
+
+ def unsubscribe(handler_class, &block)
+ Lolitra::logger.info("Unsubscribing #{handler_class}")
+ bus.unsubscribe(handler_class, &block)
end
def self.publish(message)
instance.publish(message)
end
@@ -178,10 +219,18 @@
def publish(message)
Lolitra::logger.debug("Message sent: #{message.class.message_key}")
Lolitra::logger.debug("#{message.marshall}")
bus.publish(message)
end
+
+ def self.disconnect(&block)
+ instance.disconnect(&block)
+ end
+
+ def disconnect(&block)
+ bus.disconnect(&block)
+ end
end
module Message
module MessageClass
@@ -238,92 +287,7 @@
end
def publish(message)
@socketClient.publish(message.class.message_key, message.marshall)
end
- end
-
- class AmqpBus
- attr_accessor :queue_prefix
- attr_accessor :connection
- attr_accessor :exchange
-
- def initialize(hash = {})
- Lolitra::MessageHandlerManager.bus = self
-
- @channels = {}
- @params = hash.reject { |key, value| !value }
- raise "no :exchange specified" unless hash[:exchange]
-
- self.queue_prefix = hash[:queue_prefix]||""
- AMQP::Utilities::EventLoopHelper.run do
- self.connection = AMQP.start(@params) do |connection|
- channel = create_channel(connection) do |channel|
- begin
- self.exchange = channel.topic(@params[:exchange], :durable => true)
-
- @params[:pull_subscribers].each do |handler|
- Lolitra::MessageHandlerManager.register_pull_subscriber(handler)
- end
- rescue => e
- Lolitra::logger.debug("error")
- Lolitra::log_exception(e)
- end
- end
- end
- end
- end
-
- def subscribe(message_class, handler_class)
- create_queue(message_class, handler_class, {:exclusive => true, :durable => false}, "")
- end
-
- def pull_subscribe(message_class, handler_class)
- create_queue(message_class, handler_class, {:durable => true})
- end
-
- def publish(message)
- #TODO: if exchange channel is closed doesn't log anything
- self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i)
- end
-
- private
- def create_channel(connection, &block)
- channel = AMQP::Channel.new(connection) do
- channel.on_error do |channel, close|
- Lolitra::logger.error("Channel error: #{channel}")
- Lolitra::logger.error(close)
- end
- block.call(channel)
- end
- channel
- end
-
- def create_queue(message_class, handler_class, options)
- begin
- queue_name = queue_prefix + MessageHandler::Helpers.underscore(handler_class.name)
-
- create_channel(self.connection) do |channel|
- channel.queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key)
- channel.close
- end
-
- if !@channels[queue_name] #Only one subscriber by queue_name
- @channels[queue_name] = create_channel(self.connection) do |channel|
- channel.prefetch(1).queue(queue_name, options).subscribe do |info, payload|
- begin
- Lolitra::logger.debug("Message recived: #{info.routing_key}")
- Lolitra::logger.debug("#{payload}")
- message_class_tmp = handler_class.handlers[info.routing_key][0]
- handler_class.handle(message_class_tmp.unmarshall(payload))
- rescue => e
- Lolitra::log_exception(e)
- end
- end
- end
- end
- rescue => e
- Lolitra::log_exception(e)
- end
- end
end
end