lib/insque.rb in insque-0.6.1 vs lib/insque.rb in insque-0.6.2
- old
+ new
@@ -1,20 +1,30 @@
require "insque/version"
require "redis"
+require 'json'
require "insque/railtie" if defined?(Rails)
module Insque
DEFAULT_INBOX_TTL = 10800 # seconds
+ DEFAULT_PROCESSING_TTL = 3600 # seconds
def self.inbox_ttl= val
@inbox_ttl = val
end
def self.inbox_ttl
@inbox_ttl || DEFAULT_INBOX_TTL
end
+ def self.processing_ttl= val
+ @processing_ttl = val
+ end
+
+ def self.processing_ttl
+ @processing_ttl || DEFAULT_PROCESSING_TTL
+ end
+
def self.debug= debug
@debug = debug
end
def self.redis= redis
@@ -76,11 +86,11 @@
def self.slow_listen worker_name='', redis=nil
do_listen @slow_inbox, @slow_processing, (redis || create_redis_connection), worker_name
end
def self.janitor redis=nil
- real_janitor @inbox, @processing, (redis || create_redis_connection)
+ real_janitor @inbox, @processing, (redis || create_redis_connection), @inbox_pointer
end
def self.slow_janitor redis=nil
real_janitor @slow_inbox, @slow_processing, (redis || create_redis_connection)
end
@@ -89,42 +99,46 @@
def self.do_listen inbox, processing, redis, worker_name, pointer=nil
log "#{worker_name} START LISTENING #{inbox}"
loop do
redis.setex(pointer, inbox_ttl, inbox) if pointer
message = redis.brpoplpush(inbox, processing, 0)
- log "#{worker_name} RECEIVING: #{message}" if @debug
begin
+ log "#{worker_name} RECEIVING: #{message}" if @debug
parsed_message = JSON.parse message
send(parsed_message['message'], parsed_message)
rescue NoMethodError
rescue => e
log "#{worker_name} ========== BROKEN_MESSAGE: #{message} =========="
log e.inspect
log e.backtrace
+ ensure
+ redis.lrem processing, 0, message
end
- redis.lrem processing, 0, message
end
end
- def self.real_janitor inbox, processing, redis
+ def self.real_janitor inbox, processing, redis, pointer=nil
loop do
+ redis.setex(pointer, inbox_ttl, inbox) if pointer
redis.watch processing
errors = []
restart = []
delete = []
redis.lrange(processing, 0, -1).each do |m|
begin
parsed_message = JSON.parse(m)
- if parsed_message['restarted_at'] && DateTime.parse(parsed_message['restarted_at']) < 1.hour.ago.utc
+ if parsed_message['restarted_at'] && Time.now.to_i - Time.parse(parsed_message['restarted_at']).to_i > processing_ttl
errors << parsed_message
delete << m
- elsif DateTime.parse(parsed_message['broadcasted_at']) < 1.hour.ago.utc
+ elsif Time.now.to_i - Time.parse(parsed_message['broadcasted_at']).to_i > processing_ttl
restart << parsed_message.merge(restarted_at: Time.now.utc)
delete << m
end
- rescue
+ rescue => e
log "========== JANITOR_BROKEN_MESSAGE: #{m} =========="
+ log e.inspect
+ log e.backtrace
end
end
result = redis.multi do |r|
restart.each {|m| r.lpush inbox, m.to_json }
delete.each {|m| r.lrem processing, 0, m }
@@ -134,11 +148,11 @@
restart.each {|m| log "RESTART: #{m.to_json}" }
log "CLEANING #{inbox} SUCCESSFULL"
else
log "CLEANING #{inbox} FAILED"
end
- sleep(Random.rand * 300)
+ sleep(Random.rand((inbox_ttl.to_f / 10).ceil) + 1)
end
end
def self.create_redis_connection
(@redis_class || Redis).new @redis_config
@@ -159,13 +173,13 @@
if defined?(ActiveRecord::Base)
class ActiveRecord::Base
def send_later(method, *args)
Insque.broadcast :send_later, { class: self.class.name, id: id, method: method, args: args }, :slow
end
+
def self.acts_as_insque_crud(*args)
options = args.extract_options!
excluded = (options[:exclude] || []).map(&:to_s)
-
set_callback :commit, :after do
action = [:create, :update, :destroy].map {|a| a if transaction_include_any_action?([a]) }.compact.first
params = self.serializable_hash(options).delete_if {|key| (['created_at', 'updated_at'] + excluded).include? key}
Insque.broadcast :"#{self.class.to_s.underscore}_#{action}", params
end