lib/sidekiq/api.rb in sidekiq-4.2.9 vs lib/sidekiq/api.rb in sidekiq-4.2.10
- old
+ new
@@ -73,11 +73,14 @@
s = pipe1_res[7].size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
default_queue_latency = if (entry = pipe1_res[6].first)
- Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'.freeze]
+ job = Sidekiq.load_json(entry)
+ now = Time.now.to_f
+ thence = job['enqueued_at'.freeze] || now
+ now - thence
else
0
end
@stats = {
processed: pipe1_res[0].to_i,
@@ -223,11 +226,14 @@
def latency
entry = Sidekiq.redis do |conn|
conn.lrange(@rname, -1, -1)
end.first
return 0 unless entry
- Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
+ job = Sidekiq.load_json(entry)
+ now = Time.now.to_f
+ thence = job['enqueued_at'] || now
+ now - thence
end
def each
initial_size = size
deleted_size = 0
@@ -349,11 +355,12 @@
def queue
@queue
end
def latency
- Time.now.to_f - (@item['enqueued_at'] || @item['created_at'])
+ now = Time.now.to_f
+ now - (@item['enqueued_at'] || @item['created_at'] || now)
end
##
# Remove this job from the queue.
def delete
@@ -414,24 +421,21 @@
Sidekiq::Client.push(msg)
end
end
def retry
- raise "Retry not available on jobs which have not failed" unless item["failed_at"]
remove_job do |message|
msg = Sidekiq.load_json(message)
- msg['retry_count'] -= 1
+ msg['retry_count'] -= 1 if msg['retry_count']
Sidekiq::Client.push(msg)
end
end
##
# Place job in the dead set
def kill
- raise 'Kill not available on jobs which have not failed' unless item['failed_at']
remove_job do |message|
- Sidekiq.logger.info { "Killing job #{message['jid']}" }
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, message)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
@@ -439,10 +443,14 @@
end
end
end
end
+ def error?
+ !!item['error_class']
+ end
+
private
def remove_job
Sidekiq.redis do |conn|
results = conn.multi do
@@ -583,16 +591,16 @@
##
# Allows enumeration of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
- # and deleting them from the retry queue.
+ # and deleting them from the schedule queue.
#
# r = Sidekiq::ScheduledSet.new
- # r.select do |retri|
- # retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
- # retri.args[0] == 'User' &&
- # retri.args[1] == 'setup_new_subscriber'
+ # r.select do |scheduled|
+ # scheduled.klass == 'Sidekiq::Extensions::DelayedClass' &&
+ # scheduled.args[0] == 'User' &&
+ # scheduled.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class ScheduledSet < JobSet
def initialize
super 'schedule'
end