lib/sidekiq/api.rb in sidekiq-4.2.10 vs lib/sidekiq/api.rb in sidekiq-5.0.0.beta1
- old
+ new
@@ -73,14 +73,11 @@
s = pipe1_res[7].size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
default_queue_latency = if (entry = pipe1_res[6].first)
- job = Sidekiq.load_json(entry)
- now = Time.now.to_f
- thence = job['enqueued_at'.freeze] || now
- now - thence
+ Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'.freeze]
else
0
end
@stats = {
processed: pipe1_res[0].to_i,
@@ -226,14 +223,11 @@
def latency
entry = Sidekiq.redis do |conn|
conn.lrange(@rname, -1, -1)
end.first
return 0 unless entry
- job = Sidekiq.load_json(entry)
- now = Time.now.to_f
- thence = job['enqueued_at'] || now
- now - thence
+ Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
end
def each
initial_size = size
deleted_size = 0
@@ -286,16 +280,20 @@
attr_reader :item
attr_reader :value
def initialize(item, queue_name=nil)
@value = item
- @item = item.is_a?(Hash) ? item : Sidekiq.load_json(item)
- @queue = queue_name || @item['queue']
+ @item = if item.is_a?(Hash)
+ item
+ else
+ Sidekiq.load_json(item) rescue nil
+ end
+ @queue = queue_name || self['queue']
end
def klass
- @item['class']
+ self['class']
end
def display_class
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@klass ||= case klass
@@ -322,12 +320,12 @@
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], args) do |_, _, arg|
arg
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
- job_args = @item['wrapped'] ? args[0]["arguments"] : []
- if 'ActionMailer::DeliveryJob' == (@item['wrapped'] || args[0])
+ job_args = self['wrapped'] ? args[0]["arguments"] : []
+ if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0])
# remove MailerClass, mailer_method and 'deliver_now'
job_args.drop(3)
else
job_args
end
@@ -335,32 +333,31 @@
args
end
end
def args
- @item['args']
+ self['args']
end
def jid
- @item['jid']
+ self['jid']
end
def enqueued_at
- @item['enqueued_at'] ? Time.at(@item['enqueued_at']).utc : nil
+ self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil
end
def created_at
- Time.at(@item['created_at'] || @item['enqueued_at'] || 0).utc
+ Time.at(self['created_at'] || self['enqueued_at'] || 0).utc
end
def queue
@queue
end
def latency
- now = Time.now.to_f
- now - (@item['enqueued_at'] || @item['created_at'] || now)
+ Time.now.to_f - (self['enqueued_at'] || self['created_at'] || 0)
end
##
# Remove this job from the queue.
def delete
@@ -369,11 +366,14 @@
end
count != 0
end
def [](name)
- @item[name]
+ # nil will happen if the JSON fails to parse.
+ # We don't guarantee Sidekiq will work with bad job JSON but we should
+ # make a best effort to minimize the damage.
+ @item ? @item[name] : nil
end
private
def safe_load(content, default)
@@ -421,21 +421,24 @@
Sidekiq::Client.push(msg)
end
end
def retry
+ raise "Retry not available on jobs which have not failed" unless item["failed_at"]
remove_job do |message|
msg = Sidekiq.load_json(message)
- msg['retry_count'] -= 1 if msg['retry_count']
+ msg['retry_count'] -= 1
Sidekiq::Client.push(msg)
end
end
##
# Place job in the dead set
def kill
+ raise 'Kill not available on jobs which have not failed' unless item['failed_at']
remove_job do |message|
+ Sidekiq.logger.info { "Killing job #{message['jid']}" }
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, message)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
@@ -443,14 +446,10 @@
end
end
end
end
- def error?
- !!item['error_class']
- end
-
private
def remove_job
Sidekiq.redis do |conn|
results = conn.multi do
@@ -591,17 +590,17 @@
##
# Allows enumeration of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
- # and deleting them from the schedule queue.
+ # and deleting them from the retry queue.
#
# r = Sidekiq::ScheduledSet.new
- # r.select do |scheduled|
- # scheduled.klass == 'Sidekiq::Extensions::DelayedClass' &&
- # scheduled.args[0] == 'User' &&
- # scheduled.args[1] == 'setup_new_subscriber'
+ # r.select do |retri|
+ # retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
+ # retri.args[0] == 'User' &&
+ # retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class ScheduledSet < JobSet
def initialize
super 'schedule'
end
@@ -754,10 +753,10 @@
def [](key)
@attribs[key]
end
def quiet!
- signal('USR1')
+ signal('TSTP')
end
def stop!
signal('TERM')
end