require 'resque' require 'redisk' require 'uuid' module Resque # Resque::Status is a Hash object that has helper methods for dealing with # the common status attributes. It also has a number of class methods for # creating/updating/retrieving status objects from Redis class Status < Hash VERSION = '0.1.1' extend Resque::Helpers # Create a status, generating a new UUID, passing the message to the status # Returns the UUID of the new status. def self.create(message = nil) uuid = generate_uuid set(uuid, message) redis.zadd(set_key, Time.now.to_i, uuid) redis.zremrangebyscore(set_key, 0, Time.now.to_i - @expire_in) if @expire_in uuid end # Get a status by UUID. Returns a Resque::Status def self.get(uuid) val = redis.get(status_key(uuid)) val ? Resque::Status.new(uuid, decode(val)) : nil end # set a status by UUID. messages can be any number of stirngs or hashes # that are merged in order to create a single status. def self.set(uuid, *messages) val = Resque::Status.new(uuid, *messages) redis.set(status_key(uuid), encode(val)) if expire_in redis.expire(status_key(uuid), expire_in) end val end def self.clear status_ids.each do |id| redis.del(status_key(id)) redis.zrem(set_key, id) end end # returns a Redisk::Logger scoped to the UUID. Any options passed are passed # to the logger initialization. def self.logger(uuid, options = {}) Redisk::Logger.new(logger_key(uuid), options) end # Return num Resque::Status objects in reverse chronological order. # By default returns the entire set. def self.statuses(num = -1) status_ids(num).collect do |id| get(id) end.compact end # Return the num most recent status/job UUIDs in reverse chronological order. def self.status_ids(num = -1) redis.zrevrange set_key, 0, num end # Kill the job at UUID on its next iteration this works by adding the UUID to a # kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks # if it _should_ be killed by calling tick or at. If so, it raises # a Resque::JobWithStatus::Killed error and sets the status to 'killed'. def self.kill(uuid) redis.sadd(kill_key, uuid) end # Remove the job at UUID from the kill list def self.killed(uuid) redis.srem(kill_key, uuid) end # Return the UUIDs of the jobs on the kill list def self.kill_ids redis.smembers(kill_key) end # Check whether a job with UUID is on the kill list def self.should_kill?(uuid) redis.sismember(kill_key, uuid) end # The time in seconds that jobs and statuses should expire from Redis (after # the last time they are touched/updated) def self.expire_in @expire_in end # Set the expire_in time in seconds def self.expire_in=(seconds) @expire_in = seconds.nil? ? nil : seconds.to_i end def self.status_key(uuid) "status:#{uuid}" end def self.set_key "_statuses" end def self.kill_key "_kill" end def self.logger_key(uuid) "_log:#{uuid}" end def self.generate_uuid UUID.generate(:compact) end def self.hash_accessor(name, options = {}) options[:default] ||= nil coerce = options[:coerce] ? ".#{options[:coerce]}" : "" module_eval <<-EOT def #{name} value = (self['#{name}'] ? self['#{name}']#{coerce} : #{options[:default].inspect}) yield value if block_given? value end def #{name}=(value) self['#{name}'] = value end def #{name}? !!self['#{name}'] end EOT end STATUSES = %w{queued working completed failed killed}.freeze hash_accessor :uuid hash_accessor :name hash_accessor :status hash_accessor :message hash_accessor :time hash_accessor :num hash_accessor :total # Create a new Resque::Status object. If multiple arguments are passed # it is assumed the first argument is the UUID and the rest are status objects. # All arguments are subsequentily merged in order. Strings are assumed to # be messages. def initialize(*args) super nil base_status = { 'time' => Time.now.to_i, 'status' => 'queued' } base_status['uuid'] = args.shift if args.length > 1 status_hash = args.inject(base_status) do |final, m| m = {'message' => m} if m.is_a?(String) final.merge(m || {}) end self.replace(status_hash) end # calculate the % completion of the job based on status, num # and total def pct_complete case status when 'completed' then 100 when 'queued' then 0 when 'failed' then 100 else t = (total == 0 || total.nil?) ? 1 : total (((num || 0).to_f / t.to_f) * 100).to_i end end # Return the time of the status initialization. If set returns a Time # object, otherwise returns nil def time time? ? Time.at(self['time']) : nil end STATUSES.each do |status| define_method("#{status}?") do self['status'] === status end end # Can the job be killed? 'failed', 'completed', and 'killed' jobs cant be killed # (for pretty obvious reasons) def killable? !['failed', 'completed', 'killed'].include?(self.status) end # Return a JSON representation of the current object. def to_json h = self.dup h['pct_complete'] = pct_complete self.class.encode(h) end def inspect "#" end end end