lib/sidekiq/api.rb in sidekiq-4.2.9 vs lib/sidekiq/api.rb in sidekiq-4.2.10

- old
+ new

@@ -73,11 +73,14 @@ s = pipe1_res[7].size workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+) enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+) default_queue_latency = if (entry = pipe1_res[6].first) - Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'.freeze] + job = Sidekiq.load_json(entry) + now = Time.now.to_f + thence = job['enqueued_at'.freeze] || now + now - thence else 0 end @stats = { processed: pipe1_res[0].to_i, @@ -223,11 +226,14 @@ def latency entry = Sidekiq.redis do |conn| conn.lrange(@rname, -1, -1) end.first return 0 unless entry - Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'] + job = Sidekiq.load_json(entry) + now = Time.now.to_f + thence = job['enqueued_at'] || now + now - thence end def each initial_size = size deleted_size = 0 @@ -349,11 +355,12 @@ def queue @queue end def latency - Time.now.to_f - (@item['enqueued_at'] || @item['created_at']) + now = Time.now.to_f + now - (@item['enqueued_at'] || @item['created_at'] || now) end ## # Remove this job from the queue. def delete @@ -414,24 +421,21 @@ Sidekiq::Client.push(msg) end end def retry - raise "Retry not available on jobs which have not failed" unless item["failed_at"] remove_job do |message| msg = Sidekiq.load_json(message) - msg['retry_count'] -= 1 + msg['retry_count'] -= 1 if msg['retry_count'] Sidekiq::Client.push(msg) end end ## # Place job in the dead set def kill - raise 'Kill not available on jobs which have not failed' unless item['failed_at'] remove_job do |message| - Sidekiq.logger.info { "Killing job #{message['jid']}" } now = Time.now.to_f Sidekiq.redis do |conn| conn.multi do conn.zadd('dead', now, message) conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout) @@ -439,10 +443,14 @@ end end end end + def error? + !!item['error_class'] + end + private def remove_job Sidekiq.redis do |conn| results = conn.multi do @@ -583,16 +591,16 @@ ## # Allows enumeration of scheduled jobs within Sidekiq. # Based on this, you can search/filter for jobs. Here's an # example where I'm selecting all jobs of a certain type - # and deleting them from the retry queue. + # and deleting them from the schedule queue. # # r = Sidekiq::ScheduledSet.new - # r.select do |retri| - # retri.klass == 'Sidekiq::Extensions::DelayedClass' && - # retri.args[0] == 'User' && - # retri.args[1] == 'setup_new_subscriber' + # r.select do |scheduled| + # scheduled.klass == 'Sidekiq::Extensions::DelayedClass' && + # scheduled.args[0] == 'User' && + # scheduled.args[1] == 'setup_new_subscriber' # end.map(&:delete) class ScheduledSet < JobSet def initialize super 'schedule' end