lib/lolitra/handler_base.rb in lolitra-0.0.5 vs lib/lolitra/handler_base.rb in lolitra-0.1.0b
- old
+ new
@@ -1,14 +1,30 @@
require 'singleton'
+require 'log4r'
require 'amqp'
require 'amqp/utilities/event_loop_helper'
require 'json'
+require 'fiber'
module Lolitra
+ include Log4r
+
+ @@logger = Logger.new 'lolitra'
+ @@logger.outputters = Outputter.stdout
+
+ def self.logger
+ @@logger
+ end
+
+ def self.logger=(new_logger)
+ @@logger = new_logger
+ end
+
module MessageHandler
module Helpers
- def self.underscore(word)
+ def self.underscore(arg)
+ word = arg.dup
word.gsub!(/::/, '/')
word.gsub!(/([A-Z]+)([A-Z][a-z])/,'\1_\2')
word.gsub!(/([a-z\d])([A-Z])/,'\1_\2')
word.tr!("-", "_")
word.downcase!
@@ -102,10 +118,14 @@
def self.included(base)
base.send :extend, MessageHandlerClass
end
+ def publish(message)
+ self.class.publish(message)
+ end
+
def handle(message)
handler_method = self.class.handlers[message.class.message_key][1]
raise "Can't handle message #{message.class}" unless handler_method
self.send(handler_method, message)
end
@@ -163,11 +183,11 @@
def message_key(key = nil)
if (key)
self.class_message_key = key
else
- self.class_message_key || "#{MessageHandler::Helper.underscore(self.class.name)}"
+ self.class_message_key || "#{MessageHandler::Helpers.underscore(self.class.name)}"
end
end
def unmarshall(message_json)
hash = JSON.parse(message_json)
@@ -179,20 +199,44 @@
def self.included(base)
base.send :extend, MessageClass
end
def initialize(hash={})
- hash.each { |key, value| self.send("#{MessageHandler::Helper.underscore(key)}=", value) }
+ hash.each { |key, value| self.send("#{MessageHandler::Helpers.underscore(key)}=", value) }
end
- def marshall
+ def to_hash
hash = {}
- self.instance_variables.each {|var| hash[var.to_s.delete("@")] = self.instance_variable_get(var) }
- JSON.generate(hash)
+ self.instance_variables.each {|var| hash[var.to_s.delete("@").to_sym] = self.instance_variable_get(var) }
+ hash
end
+
+ def marshall
+ JSON.generate(to_hash)
+ end
end
+ class FayeBus
+ def initialize(options = {})
+ EM::next_tick do
+ @socketClient = Faye::Client.new(options[:url] || 'http://localhost:9292/faye')
+ end
+ end
+
+ def subscribe(message_class, handler_class)
+ EM::next_tick do
+ @socketClient.subscribe(message_class.message_key) do |payload|
+ handler_class.handle(message_class.unmarshall(payload))
+ end
+ end
+ end
+
+ def publish(message)
+ @socketClient.publish(message.class.message_key, message.marshall)
+ end
+ end
+
class AmqpBus
attr_accessor :queue_prefix
attr_accessor :connection
attr_accessor :exchange
@@ -215,19 +259,44 @@
def pull_subscribe(message_class, handler_class)
create_queue(message_class, handler_class, {:durable => true}, queue_prefix + MessageHandler::Helpers.underscore(handler_class.name))
end
def publish(message)
+ Lolitra::logger.debug("Message sent: #{message.class.message_key}")
+ Lolitra::logger.debug("#{message.marshall}")
self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i)
end
private
def create_queue(message_class, handler_class, options, queue_name)
EM.next_tick do
channel = AMQP::Channel.new(self.connection)
- channel.prefetch(1).queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key).subscribe do |info, payload|
- message_class_tmp = handler_class.handlers[info.routing_key][0]
- handler_class.handle(message_class_tmp.unmarshall(payload))
+ channel.prefetch(1).queue(queue_name, options).bind(self.exchange, :routing_key => message_class.message_key).subscribe(:ack => true) do |info, payload|
+ Fiber.new do
+ current_fiber = Fiber.current
+ for i in (0..5)
+ begin
+ Lolitra::logger.debug("Message recived: #{info.routing_key}")
+ Lolitra::logger.debug("#{payload}")
+ message_class_tmp = handler_class.handlers[info.routing_key][0]
+ handler_class.handle(message_class_tmp.unmarshall(payload))
+ info.ack
+ Lolitra::logger.debug("Message processed")
+ break
+ rescue => e
+ Lolitra::logger.error("Try #{i}: #{e.message}")
+ if (i!=5)
+ EventMachine.add_timer(5) do
+ current_fiber.resume
+ end
+ Fiber.yield
+ else
+ Lolitra::logger.error(e.backtrace.join("\n\t"))
+ info.reject(:requeue => false)
+ end
+ end
+ end
+ end.resume
end
end
end
end
end