require 'resque' require 'redisk' require 'uuid' module Resque # Resque::Status is a Hash object that has helper methods for dealing with # the common status attributes. It also has a number of class methods for # creating/updating/retrieving status objects from Redis class Status < Hash VERSION = '0.2.1' extend Resque::Helpers # Create a status, generating a new UUID, passing the message to the status # Returns the UUID of the new status. def self.create(message = nil) uuid = generate_uuid set(uuid, message) redis.zadd(set_key, Time.now.to_i, uuid) redis.zremrangebyscore(set_key, 0, Time.now.to_i - @expire_in) if @expire_in uuid end # Get a status by UUID. Returns a Resque::Status def self.get(uuid) val = redis.get(status_key(uuid)) val ? Resque::Status.new(uuid, decode(val)) : nil end # set a status by UUID. messages can be any number of stirngs or hashes # that are merged in order to create a single status. def self.set(uuid, *messages) val = Resque::Status.new(uuid, *messages) redis.set(status_key(uuid), encode(val)) if expire_in redis.expire(status_key(uuid), expire_in) end val end # clear statuses from redis passing an optional range. See `statuses` for info # about ranges def self.clear(range_start = nil, range_end = nil) status_ids(range_start, range_end).each do |id| redis.del(status_key(id)) redis.zrem(set_key, id) end end # returns a Redisk::Logger scoped to the UUID. Any options passed are passed # to the logger initialization. # # Ensures that Redisk is logging to the same Redis connection as Resque. def self.logger(uuid, options = {}) Redisk.redis = redis Redisk::Logger.new(logger_key(uuid), options) end def self.count redis.zcard(set_key) end # Return num Resque::Status objects in reverse chronological order. # By default returns the entire set. # @param [Numeric] range_start The optional starting range # @param [Numeric] range_end The optional ending range # @example retuning the last 20 statuses # Resque::Status.statuses(0, 20) def self.statuses(range_start = nil, range_end = nil) status_ids(range_start, range_end).collect do |id| get(id) end.compact end # Return the num most recent status/job UUIDs in reverse chronological order. def self.status_ids(range_start = nil, range_end = nil) unless range_end && range_start # Because we want a reverse chronological order, we need to get a range starting # by the higest negative number. redis.zrevrange(set_key, 0, -1) || [] else # Because we want a reverse chronological order, we need to get a range starting # by the higest negative number. The ordering is transparent from the API user's # perspective so we need to convert the passed params if range_start == 0 range_start = -1 else range_end -= 1 end (redis.zrevrange(set_key, -(range_end.abs), -(range_start.abs)) || []).reverse end end # Kill the job at UUID on its next iteration this works by adding the UUID to a # kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks # if it _should_ be killed by calling tick or at. If so, it raises # a Resque::JobWithStatus::Killed error and sets the status to 'killed'. def self.kill(uuid) redis.sadd(kill_key, uuid) end # Remove the job at UUID from the kill list def self.killed(uuid) redis.srem(kill_key, uuid) end # Return the UUIDs of the jobs on the kill list def self.kill_ids redis.smembers(kill_key) end # Check whether a job with UUID is on the kill list def self.should_kill?(uuid) redis.sismember(kill_key, uuid) end # The time in seconds that jobs and statuses should expire from Redis (after # the last time they are touched/updated) def self.expire_in @expire_in end # Set the expire_in time in seconds def self.expire_in=(seconds) @expire_in = seconds.nil? ? nil : seconds.to_i end def self.status_key(uuid) "status:#{uuid}" end def self.set_key "_statuses" end def self.kill_key "_kill" end def self.logger_key(uuid) "_log:#{uuid}" end def self.generate_uuid UUID.generate(:compact) end def self.hash_accessor(name, options = {}) options[:default] ||= nil coerce = options[:coerce] ? ".#{options[:coerce]}" : "" module_eval <<-EOT def #{name} value = (self['#{name}'] ? self['#{name}']#{coerce} : #{options[:default].inspect}) yield value if block_given? value end def #{name}=(value) self['#{name}'] = value end def #{name}? !!self['#{name}'] end EOT end STATUSES = %w{queued working completed failed killed}.freeze hash_accessor :uuid hash_accessor :name hash_accessor :status hash_accessor :message hash_accessor :time hash_accessor :num hash_accessor :total # Create a new Resque::Status object. If multiple arguments are passed # it is assumed the first argument is the UUID and the rest are status objects. # All arguments are subsequentily merged in order. Strings are assumed to # be messages. def initialize(*args) super nil base_status = { 'time' => Time.now.to_i, 'status' => 'queued' } base_status['uuid'] = args.shift if args.length > 1 status_hash = args.inject(base_status) do |final, m| m = {'message' => m} if m.is_a?(String) final.merge(m || {}) end self.replace(status_hash) end # calculate the % completion of the job based on status, num # and total def pct_complete case status when 'completed' then 100 when 'queued' then 0 else t = (total == 0 || total.nil?) ? 1 : total (((num || 0).to_f / t.to_f) * 100).to_i end end # Return the time of the status initialization. If set returns a Time # object, otherwise returns nil def time time? ? Time.at(self['time']) : nil end STATUSES.each do |status| define_method("#{status}?") do self['status'] === status end end # Can the job be killed? 'failed', 'completed', and 'killed' jobs cant be killed # (for pretty obvious reasons) def killable? !['failed', 'completed', 'killed'].include?(self.status) end unless method_defined?(:to_json) def to_json(*args) json end end # Return a JSON representation of the current object. def json h = self.dup h['pct_complete'] = pct_complete self.class.encode(h) end def inspect "#" end end end