module Resque
module Plugins
module Status
#OVERRIDE so we can add OverridesAndExtensionsClassMethods
def self.included(base)
attr_reader :worker
# can't call super, so add ClassMethods here that resque-status was doing
base.extend(ClassMethods) #add the methods in the resque-status gem
base.extend(ClassOverridesAndExtensions)
end
module ClassOverridesAndExtensions
#OVERRIDE to set the name that will be displayed on the status page for this job when it is first queued.
#The name will be changed when set_status is called, which is called on #tick, to the value set in your name method,
#but the UI name field is blank when it is first queued, so setting it here so we have something.
def enqueue_to(queue, klass, options = {})
uuid = Resque::Plugins::Status::Hash.generate_uuid
Resque::Plugins::Status::Hash.create uuid, {name: "#{self.name}: #{options.inspect}"}.merge(options)
if Resque.enqueue_to(queue, klass, uuid, options)
uuid
else
Resque::Plugins::Status::Hash.remove(uuid)
nil
end
end
# This is the method called by Resque::Worker when processing jobs. It
# creates a new instance of the job class and populates it with the uuid and
# options.
#
# You should not override this method, rather the perform instance method.
# OVERRIDE to get the worker and set when initializing the class
def perform(uuid=nil, options = {})
uuid ||= Resque::Plugins::Status::Hash.generate_uuid
worker = yield if block_given?
instance = new(uuid, worker, options)
instance.safe_perform!
instance
end
# OVERRIDE to clear all the keys that have the UUI. status, counters, etc.
def remove(uuid)
Resque.redis.zrem(Resque::Plugins::Status::Hash.set_key, uuid)
Resque.redis.keys("*#{uuid}").each do |key|
Resque.redis.del(key)
end
end
#If multiple workers are running at once and you need an incrementer, you can't use the status' num attribute because of race conditions.
#You can use a counter and call incr on it instead
def counter_key(counter, uuid)
"#{counter}:#{uuid}"
end
def counter(counter, uuid)
Resque.redis[counter_key(counter, uuid)].to_i
end
def incr_counter(counter, uuid)
key = counter_key(counter, uuid)
n = Resque.redis.incr(key)
if Resque::Plugins::Status::Hash.expire_in
Resque.redis.expire(key, Resque::Plugins::Status::Hash.expire_in)
end
n
end
end
# sets the status of the job for the current iteration. You should use
# the at method if you have actual numbers to track the iteration count.
# This will kill the job if it has been added to the kill list with
# Resque::Status.kill()
def tick(*messages)
kill! if should_kill? || status.killed?
set_status({'status' => 'working'}, *messages)
# check to see if the worker doing the job has been paused, pause the job if so
if self.worker && self.worker.paused?
loop do
# Set the status to paused.
# May need to do this repeatedly because there could be workers in a chained job still doing work.
pause! unless status.paused?
break unless self.worker.paused?
sleep 60
end
set_status({'status' => 'working'}, *messages) unless status && (status.completed? || status.paused? || status.killed?)
end
end
# Pause the current job, setting the status to 'paused'
def pause!
set_status({
'status' => 'paused',
'message' => "#{worker} paused at #{Time.now}"
})
end
# Create a new instance with uuid and options
# OVERRIDE to add the worker attr
def initialize(uuid, worker = nil, options = {})
@uuid = uuid
@options = options
@worker = worker
end
# Run by the Resque::Worker when processing this job. It wraps the perform
# method ensuring that the final status of the job is set regardless of error.
# If an error occurs within the job's work, it will set the status as failed and
# re-raise the error.
#OVERRIDE to kill it. The parent job may have been killed, so all child jobs should die as well.
def safe_perform!
k = should_kill?
kill! if k
unless k || (status && status.killed?)
set_status({'status' => 'working'})
perform
if status && status.failed?
on_failure(status.message) if respond_to?(:on_failure)
return
elsif status && !status.completed?
completed
end
on_success if respond_to?(:on_success)
end
rescue Killed
Rails.logger.info "Job #{self} Killed at #{Time.now}"
Resque::Plugins::Status::Hash.killed(uuid)
on_killed if respond_to?(:on_killed)
rescue Exception => e
Rails.logger.error e
failed("The task failed because of an error: #{e}")
if respond_to?(:on_failure)
on_failure(e)
else
raise e
end
end
# sets a message for the job on the overview page
# it can be set repeatedly durring the job's processing to
# indicate the status of the job.
def overview_message=(message)
# there is no worker when run inline
self.worker.overview_message = message if self.worker
end
def incr_counter(counter)
self.class.incr_counter(counter, uuid)
end
def counter(counter)
self.class.counter(counter, uuid)
end
end
end
end