lib/sidekiq/api.rb in sidekiq-4.2.10 vs lib/sidekiq/api.rb in sidekiq-5.0.0.beta1

- old
+ new

@@ -73,14 +73,11 @@ s = pipe1_res[7].size workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+) enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+) default_queue_latency = if (entry = pipe1_res[6].first) - job = Sidekiq.load_json(entry) - now = Time.now.to_f - thence = job['enqueued_at'.freeze] || now - now - thence + Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'.freeze] else 0 end @stats = { processed: pipe1_res[0].to_i, @@ -226,14 +223,11 @@ def latency entry = Sidekiq.redis do |conn| conn.lrange(@rname, -1, -1) end.first return 0 unless entry - job = Sidekiq.load_json(entry) - now = Time.now.to_f - thence = job['enqueued_at'] || now - now - thence + Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'] end def each initial_size = size deleted_size = 0 @@ -286,16 +280,20 @@ attr_reader :item attr_reader :value def initialize(item, queue_name=nil) @value = item - @item = item.is_a?(Hash) ? item : Sidekiq.load_json(item) - @queue = queue_name || @item['queue'] + @item = if item.is_a?(Hash) + item + else + Sidekiq.load_json(item) rescue nil + end + @queue = queue_name || self['queue'] end def klass - @item['class'] + self['class'] end def display_class # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI @klass ||= case klass @@ -322,12 +320,12 @@ when /\ASidekiq::Extensions::Delayed/ safe_load(args[0], args) do |_, _, arg| arg end when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" - job_args = @item['wrapped'] ? args[0]["arguments"] : [] - if 'ActionMailer::DeliveryJob' == (@item['wrapped'] || args[0]) + job_args = self['wrapped'] ? args[0]["arguments"] : [] + if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0]) # remove MailerClass, mailer_method and 'deliver_now' job_args.drop(3) else job_args end @@ -335,32 +333,31 @@ args end end def args - @item['args'] + self['args'] end def jid - @item['jid'] + self['jid'] end def enqueued_at - @item['enqueued_at'] ? Time.at(@item['enqueued_at']).utc : nil + self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil end def created_at - Time.at(@item['created_at'] || @item['enqueued_at'] || 0).utc + Time.at(self['created_at'] || self['enqueued_at'] || 0).utc end def queue @queue end def latency - now = Time.now.to_f - now - (@item['enqueued_at'] || @item['created_at'] || now) + Time.now.to_f - (self['enqueued_at'] || self['created_at'] || 0) end ## # Remove this job from the queue. def delete @@ -369,11 +366,14 @@ end count != 0 end def [](name) - @item[name] + # nil will happen if the JSON fails to parse. + # We don't guarantee Sidekiq will work with bad job JSON but we should + # make a best effort to minimize the damage. + @item ? @item[name] : nil end private def safe_load(content, default) @@ -421,21 +421,24 @@ Sidekiq::Client.push(msg) end end def retry + raise "Retry not available on jobs which have not failed" unless item["failed_at"] remove_job do |message| msg = Sidekiq.load_json(message) - msg['retry_count'] -= 1 if msg['retry_count'] + msg['retry_count'] -= 1 Sidekiq::Client.push(msg) end end ## # Place job in the dead set def kill + raise 'Kill not available on jobs which have not failed' unless item['failed_at'] remove_job do |message| + Sidekiq.logger.info { "Killing job #{message['jid']}" } now = Time.now.to_f Sidekiq.redis do |conn| conn.multi do conn.zadd('dead', now, message) conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout) @@ -443,14 +446,10 @@ end end end end - def error? - !!item['error_class'] - end - private def remove_job Sidekiq.redis do |conn| results = conn.multi do @@ -591,17 +590,17 @@ ## # Allows enumeration of scheduled jobs within Sidekiq. # Based on this, you can search/filter for jobs. Here's an # example where I'm selecting all jobs of a certain type - # and deleting them from the schedule queue. + # and deleting them from the retry queue. # # r = Sidekiq::ScheduledSet.new - # r.select do |scheduled| - # scheduled.klass == 'Sidekiq::Extensions::DelayedClass' && - # scheduled.args[0] == 'User' && - # scheduled.args[1] == 'setup_new_subscriber' + # r.select do |retri| + # retri.klass == 'Sidekiq::Extensions::DelayedClass' && + # retri.args[0] == 'User' && + # retri.args[1] == 'setup_new_subscriber' # end.map(&:delete) class ScheduledSet < JobSet def initialize super 'schedule' end @@ -754,10 +753,10 @@ def [](key) @attribs[key] end def quiet! - signal('USR1') + signal('TSTP') end def stop! signal('TERM') end