lib/insque.rb in insque-0.3.2 vs lib/insque.rb in insque-0.3.3
- old
+ new
@@ -5,12 +5,13 @@
module Insque
def self.debug= debug
@debug = debug
end
- def self.redis= redis
- @redis = redis
+ def self.redis_config= redis
+ @redis_config = redis
+ @redis = Redis.new @redis_config
@redis.select 7
end
def self.sender= sender
@sender = sender
@@ -35,34 +36,40 @@
keys.each {|k| r.lpush k, value.to_json}
end
end
def self.listen worker_name=''
- @redis.sadd 'insque_inboxes', @inbox
+ redis = Redis.new redis
+ redis.select 7
+
+ redis.sadd 'insque_inboxes', @inbox
log "#{worker_name} START LISTENING #{@inbox}"
loop do
- message = @redis.brpoplpush(@inbox, @processing, 0)
+ message = redis.brpoplpush(@inbox, @processing, 0)
log "#{worker_name} RECEIVING: #{message}" if @debug
begin
parsed_message = JSON.parse message
send(parsed_message['message'], parsed_message) if self.respond_to? parsed_message['message']
rescue => e
log "#{worker_name} ========== BROKEN_MESSAGE: #{message} =========="
log e.inspect
log e.backtrace
end
- @redis.lrem @processing, 0, message
+ redis.lrem @processing, 0, message
end
end
def self.janitor
+ redis = Redis.new redis
+ redis.select 7
+
loop do
- @redis.watch @processing
+ redis.watch @processing
errors = []
restart = []
delete = []
- @redis.lrange(@processing, 0, -1).each do |m|
+ redis.lrange(@processing, 0, -1).each do |m|
begin
parsed_message = JSON.parse(m)
if parsed_message['restarted_at'] && DateTime.parse(parsed_message['restarted_at']) < 1.hour.ago.utc
errors << parsed_message
delete << m
@@ -72,11 +79,11 @@
end
rescue
log "========== JANITOR_BROKEN_MESSAGE: #{m} =========="
end
end
- result = @redis.multi do |r|
+ result = redis.multi do |r|
restart.each {|m| r.lpush @inbox, m.to_json }
delete.each {|m| r.lrem @processing, 0, m }
end
if result
errors.each {|m| log "ERROR: #{m.to_json}" }
@@ -87,10 +94,10 @@
end
sleep(Random.rand * 10)
end
end
- private
+private
def self.log message
print "#{Time.now.utc} #{message}\n"
STDOUT.flush if @debug
end