lib/insque.rb in insque-0.6.1 vs lib/insque.rb in insque-0.6.2

- old
+ new

@@ -1,20 +1,30 @@ require "insque/version" require "redis" +require 'json' require "insque/railtie" if defined?(Rails) module Insque DEFAULT_INBOX_TTL = 10800 # seconds + DEFAULT_PROCESSING_TTL = 3600 # seconds def self.inbox_ttl= val @inbox_ttl = val end def self.inbox_ttl @inbox_ttl || DEFAULT_INBOX_TTL end + def self.processing_ttl= val + @processing_ttl = val + end + + def self.processing_ttl + @processing_ttl || DEFAULT_PROCESSING_TTL + end + def self.debug= debug @debug = debug end def self.redis= redis @@ -76,11 +86,11 @@ def self.slow_listen worker_name='', redis=nil do_listen @slow_inbox, @slow_processing, (redis || create_redis_connection), worker_name end def self.janitor redis=nil - real_janitor @inbox, @processing, (redis || create_redis_connection) + real_janitor @inbox, @processing, (redis || create_redis_connection), @inbox_pointer end def self.slow_janitor redis=nil real_janitor @slow_inbox, @slow_processing, (redis || create_redis_connection) end @@ -89,42 +99,46 @@ def self.do_listen inbox, processing, redis, worker_name, pointer=nil log "#{worker_name} START LISTENING #{inbox}" loop do redis.setex(pointer, inbox_ttl, inbox) if pointer message = redis.brpoplpush(inbox, processing, 0) - log "#{worker_name} RECEIVING: #{message}" if @debug begin + log "#{worker_name} RECEIVING: #{message}" if @debug parsed_message = JSON.parse message send(parsed_message['message'], parsed_message) rescue NoMethodError rescue => e log "#{worker_name} ========== BROKEN_MESSAGE: #{message} ==========" log e.inspect log e.backtrace + ensure + redis.lrem processing, 0, message end - redis.lrem processing, 0, message end end - def self.real_janitor inbox, processing, redis + def self.real_janitor inbox, processing, redis, pointer=nil loop do + redis.setex(pointer, inbox_ttl, inbox) if pointer redis.watch processing errors = [] restart = [] delete = [] redis.lrange(processing, 0, -1).each do |m| begin parsed_message = JSON.parse(m) - if parsed_message['restarted_at'] && DateTime.parse(parsed_message['restarted_at']) < 1.hour.ago.utc + if parsed_message['restarted_at'] && Time.now.to_i - Time.parse(parsed_message['restarted_at']).to_i > processing_ttl errors << parsed_message delete << m - elsif DateTime.parse(parsed_message['broadcasted_at']) < 1.hour.ago.utc + elsif Time.now.to_i - Time.parse(parsed_message['broadcasted_at']).to_i > processing_ttl restart << parsed_message.merge(restarted_at: Time.now.utc) delete << m end - rescue + rescue => e log "========== JANITOR_BROKEN_MESSAGE: #{m} ==========" + log e.inspect + log e.backtrace end end result = redis.multi do |r| restart.each {|m| r.lpush inbox, m.to_json } delete.each {|m| r.lrem processing, 0, m } @@ -134,11 +148,11 @@ restart.each {|m| log "RESTART: #{m.to_json}" } log "CLEANING #{inbox} SUCCESSFULL" else log "CLEANING #{inbox} FAILED" end - sleep(Random.rand * 300) + sleep(Random.rand((inbox_ttl.to_f / 10).ceil) + 1) end end def self.create_redis_connection (@redis_class || Redis).new @redis_config @@ -159,13 +173,13 @@ if defined?(ActiveRecord::Base) class ActiveRecord::Base def send_later(method, *args) Insque.broadcast :send_later, { class: self.class.name, id: id, method: method, args: args }, :slow end + def self.acts_as_insque_crud(*args) options = args.extract_options! excluded = (options[:exclude] || []).map(&:to_s) - set_callback :commit, :after do action = [:create, :update, :destroy].map {|a| a if transaction_include_any_action?([a]) }.compact.first params = self.serializable_hash(options).delete_if {|key| (['created_at', 'updated_at'] + excluded).include? key} Insque.broadcast :"#{self.class.to_s.underscore}_#{action}", params end