lib/insque.rb in insque-0.3.2 vs lib/insque.rb in insque-0.3.3

- old
+ new

@@ -5,12 +5,13 @@ module Insque def self.debug= debug @debug = debug end - def self.redis= redis - @redis = redis + def self.redis_config= redis + @redis_config = redis + @redis = Redis.new @redis_config @redis.select 7 end def self.sender= sender @sender = sender @@ -35,34 +36,40 @@ keys.each {|k| r.lpush k, value.to_json} end end def self.listen worker_name='' - @redis.sadd 'insque_inboxes', @inbox + redis = Redis.new redis + redis.select 7 + + redis.sadd 'insque_inboxes', @inbox log "#{worker_name} START LISTENING #{@inbox}" loop do - message = @redis.brpoplpush(@inbox, @processing, 0) + message = redis.brpoplpush(@inbox, @processing, 0) log "#{worker_name} RECEIVING: #{message}" if @debug begin parsed_message = JSON.parse message send(parsed_message['message'], parsed_message) if self.respond_to? parsed_message['message'] rescue => e log "#{worker_name} ========== BROKEN_MESSAGE: #{message} ==========" log e.inspect log e.backtrace end - @redis.lrem @processing, 0, message + redis.lrem @processing, 0, message end end def self.janitor + redis = Redis.new redis + redis.select 7 + loop do - @redis.watch @processing + redis.watch @processing errors = [] restart = [] delete = [] - @redis.lrange(@processing, 0, -1).each do |m| + redis.lrange(@processing, 0, -1).each do |m| begin parsed_message = JSON.parse(m) if parsed_message['restarted_at'] && DateTime.parse(parsed_message['restarted_at']) < 1.hour.ago.utc errors << parsed_message delete << m @@ -72,11 +79,11 @@ end rescue log "========== JANITOR_BROKEN_MESSAGE: #{m} ==========" end end - result = @redis.multi do |r| + result = redis.multi do |r| restart.each {|m| r.lpush @inbox, m.to_json } delete.each {|m| r.lrem @processing, 0, m } end if result errors.each {|m| log "ERROR: #{m.to_json}" } @@ -87,10 +94,10 @@ end sleep(Random.rand * 10) end end - private +private def self.log message print "#{Time.now.utc} #{message}\n" STDOUT.flush if @debug end