lib/sidekiq/cron/job.rb in sidekiq-cron-1.6.0 vs lib/sidekiq/cron/job.rb in sidekiq-cron-1.7.0
- old
+ new
@@ -1,20 +1,23 @@
require 'fugit'
require 'sidekiq'
require 'sidekiq/cron/support'
+require 'sidekiq/options'
module Sidekiq
module Cron
class Job
- #how long we would like to store informations about previous enqueues
+ # How long we would like to store informations about previous enqueues.
REMEMBER_THRESHOLD = 24 * 60 * 60
+
+ # Time format for enqueued jobs.
LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z'
- # Use the exists? method if we're on a newer version of redis.
+ # Use the exists? method if we're on a newer version of Redis.
REDIS_EXISTS_METHOD = Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists?
- #crucial part of whole enquing job
+ # Crucial part of whole enqueuing job.
def should_enque? time
enqueue = false
enqueue = Sidekiq.redis do |conn|
status == "enabled" &&
not_past_scheduled_time?(time) &&
@@ -22,30 +25,28 @@
conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
end
enqueue
end
- # remove previous informations about run times
- # this will clear redis and make sure that redis will
- # not overflow with memory
+ # Remove previous information about run times,
+ # this will clear Redis and make sure that Redis will not overflow with memory.
def remove_previous_enques time
Sidekiq.redis do |conn|
conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
end
end
- #test if job should be enqued If yes add it to queue
+ # Test if job should be enqueued.
def test_and_enque_for_time! time
- #should this job be enqued?
if should_enque?(time)
enque!
remove_previous_enques(time)
end
end
- #enque cron job to queue
+ # Enqueue cron job to queue.
def enque! time = Time.now.utc
@last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT)
klass_const =
begin
@@ -78,21 +79,31 @@
@active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
rescue NameError
false
end
+ def date_as_argument?
+ !!@date_as_argument
+ end
+
+ def enqueue_args
+ date_as_argument? ? @args + [Time.now.to_f] : @args
+ end
+
def enqueue_active_job(klass_const)
- klass_const.set(queue: @queue).perform_later(*@args)
+ klass_const.set(queue: @queue).perform_later(*enqueue_args)
end
def enqueue_sidekiq_worker(klass_const)
- klass_const.set(queue: queue_name_with_prefix).perform_async(*@args)
+ klass_const.set(queue: queue_name_with_prefix).perform_async(*enqueue_args)
end
- # siodekiq worker message
+ # Sidekiq worker message.
def sidekiq_worker_message
- @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
+ message = @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
+ message["args"] = enqueue_args
+ message
end
def queue_name_with_prefix
return @queue unless is_active_job?
@@ -113,29 +124,29 @@
end
queue_name
end
- # active job has different structure how it is loading data from sidekiq
- # queue, it createaswrapper arround job
+ # Active Job has different structure how it is loading data from Sidekiq
+ # queue, it creates a wrapper around job.
def active_job_message
{
'class' => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
'wrapped' => @klass,
'queue' => @queue_name_with_prefix,
'description' => @description,
'args' => [{
'job_class' => @klass,
'job_id' => SecureRandom.uuid,
'queue_name' => @queue_name_with_prefix,
- 'arguments' => @args
+ 'arguments' => enqueue_args
}]
}
end
- # load cron jobs from Hash
- # input structure should look like:
+ # Load cron jobs from Hash.
+ # Input structure should look like:
# {
# 'name_of_job' => {
# 'class' => 'MyClass',
# 'cron' => '1 * * * *',
# 'args' => '(OPTIONAL) [Array or Hash]',
@@ -153,19 +164,19 @@
out << job
end
load_from_array array
end
- # like to {#load_from_hash}
- # If exists old jobs in redis but removed from args, destroy old jobs
+ # Like #load_from_hash.
+ # If exists old jobs in Redis but removed from args, destroy old jobs.
def self.load_from_hash! hash
destroy_removed_jobs(hash.keys)
load_from_hash(hash)
end
- # load cron jobs from Array
- # input structure should look like:
+ # Load cron jobs from Array.
+ # Input structure should look like:
# [
# {
# 'name' => 'name_of_job',
# 'class' => 'MyClass',
# 'cron' => '1 * * * *',
@@ -186,19 +197,19 @@
errors[job.name] = job.errors unless job.save
end
errors
end
- # like to {#load_from_array}
- # If exists old jobs in redis but removed from args, destroy old jobs
+ # Like #load_from_array.
+ # If exists old jobs in Redis but removed from args, destroy old jobs.
def self.load_from_array! array
job_names = array.map { |job| job["name"] }
destroy_removed_jobs(job_names)
load_from_array(array)
end
- # get all cron jobs
+ # Get all cron jobs.
def self.all
job_hashes = nil
Sidekiq.redis do |conn|
set_members = conn.smembers(jobs_key)
job_hashes = conn.pipelined do |pipeline|
@@ -206,11 +217,11 @@
pipeline.hgetall(key)
end
end
end
job_hashes.compact.reject(&:empty?).collect do |h|
- # no need to fetch missing args from redis since we just got this hash from there
+ # No need to fetch missing args from Redis since we just got this hash from there
Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
end
end
def self.count
@@ -220,11 +231,11 @@
end
out
end
def self.find name
- #if name is hash try to get name from it
+ # If name is hash try to get name from it.
name = name[:name] || name['name'] if name.is_a?(Hash)
output = nil
Sidekiq.redis do |conn|
if exists? name
@@ -232,18 +243,18 @@
end
end
output if output && output.valid?
end
- # create new instance of cron job
+ # Create new instance of cron job.
def self.create hash
new(hash).save
end
- #destroy job by name
+ # Destroy job by name.
def self.destroy name
- #if name is hash try to get name from it
+ # If name is hash try to get name from it.
name = name[:name] || name['name'] if name.is_a?(Hash)
if job = find(name)
job.destroy
else
@@ -261,28 +272,29 @@
@name = args["name"]
@cron = args["cron"]
@description = args["description"] if args["description"]
- #get class from klass or class
+ # Get class from klass or class.
@klass = args["klass"] || args["class"]
- #set status of job
+ # Set status of job.
@status = args['status'] || status_from_redis
- #set last enqueue time - from args or from existing job
+ # Set last enqueue time - from args or from existing job.
if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
@last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
else
@last_enqueue_time = last_enqueue_time_from_redis
end
- #get right arguments for job
+ # Get right arguments for job.
@symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@args = args["args"].nil? ? [] : parse_args( args["args"] )
- @args += [Time.now.to_f] if args["date_as_argument"]
+ @date_as_argument = args["date_as_argument"] == true || ("#{args["date_as_argument"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
+
@active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@active_job_queue_name_prefix = args["queue_name_prefix"]
@active_job_queue_name_delimiter = args["queue_name_delimiter"]
if args["message"]
@@ -293,34 +305,34 @@
message_data = {
"class" => @klass.to_s,
"args" => @args,
}
- #get right data for message
- #only if message wasn't specified before
+ # Get right data for message,
+ # only if message wasn't specified before.
klass_data = case @klass
when Class
@klass.get_sidekiq_options
when String
begin
Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
rescue Exception => e
- #Unknown class
+ # Unknown class
{"queue"=>"default"}
end
end
message_data = klass_data.merge(message_data)
- #override queue if setted in config
- #only if message is hash - can be string (dumped JSON)
+
+ # Override queue if setted in config,
+ # only if message is hash - can be string (dumped JSON).
if args['queue']
@queue = message_data['queue'] = args['queue']
else
@queue = message_data['queue'] || "default"
end
- #dump message as json
@message = message_data
end
@queue_name_with_prefix = queue_name_with_prefix
end
@@ -378,24 +390,24 @@
out =
Sidekiq.redis do |conn|
conn.lrange(jid_history_key, 0, -1) rescue nil
end
- # returns nil if out nil
out && out.map do |jid_history_raw|
Sidekiq.load_json jid_history_raw
end
end
- #export job data to hash
+ # Export job data to hash.
def to_hash
{
name: @name,
klass: @klass,
cron: @cron,
description: @description,
args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
+ date_as_argument: @date_as_argument,
message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
status: @status,
active_job: @active_job,
queue_name_prefix: @active_job_queue_name_prefix,
queue_name_delimiter: @active_job_queue_name_delimiter,
@@ -407,19 +419,27 @@
def errors
@errors ||= []
end
def valid?
- #clear previous errors
+ # Clear previous errors.
@errors = []
errors << "'name' must be set" if @name.nil? || @name.size == 0
if @cron.nil? || @cron.size == 0
errors << "'cron' must be set"
else
begin
- @parsed_cron = Fugit.do_parse_cron(@cron)
+ c = Fugit.do_parse(@cron)
+
+ # Since `Fugit.do_parse` might yield a Fugit::Duration or an EtOrbi::EoTime
+ # https://github.com/floraison/fugit#fugitparses
+ if c.is_a?(Fugit::Cron)
+ @parsed_cron = c
+ else
+ errors << "'cron' -> #{@cron.inspect} -> not a cron but a #{c.class}"
+ end
rescue => e
errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
end
end
@@ -436,89 +456,78 @@
@klass.size > 0
else
end
end
- # add job to cron jobs
- # input:
- # name: (string) - name of job
- # cron: (string: '* * * * *' - cron specification when to run job
- # class: (string|class) - which class to perform
- # optional input:
- # queue: (string) - which queue to use for enquing (will override class queue)
- # args: (array|hash|nil) - arguments for permorm method
-
def save
- #if job is invalid return false
+ # If job is invalid, return false.
return false unless valid?
Sidekiq.redis do |conn|
- #add to set of all jobs
+ # Add to set of all jobs
conn.sadd self.class.jobs_key, redis_key
- #add informations for this job!
+ # Add informations for this job!
conn.hmset redis_key, *hash_to_redis(to_hash)
- #add information about last time! - don't enque right after scheduler poller starts!
+ # Add information about last time! - don't enque right after scheduler poller starts!
time = Time.now.utc
conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key)
end
Sidekiq.logger.info { "Cron Jobs - added job with name: #{@name}" }
end
def save_last_enqueue_time
Sidekiq.redis do |conn|
- # update last enqueue time
+ # Update last enqueue time.
conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
end
end
def add_jid_history(jid)
jid_history = {
jid: jid,
enqueued: @last_enqueue_time
}
- cron_history_size = Sidekiq.respond_to?(:[]) ? Sidekiq[:cron_history_size] : Sidekiq.options[:cron_history_size]
- @history_size ||= (cron_history_size || 10).to_i - 1
+
+ @history_size ||= (Sidekiq::Options[:cron_history_size] || 10).to_i - 1
Sidekiq.redis do |conn|
conn.lpush jid_history_key,
Sidekiq.dump_json(jid_history)
- # keep only last 10 entries in a fifo manner
+ # Keep only last 10 entries in a fifo manner.
conn.ltrim jid_history_key, 0, @history_size
end
end
- # remove job from cron jobs by name
- # input:
- # first arg: name (string) - name of job (must be same - case sensitive)
def destroy
Sidekiq.redis do |conn|
- #delete from set
+ # Delete from set.
conn.srem self.class.jobs_key, redis_key
- #delete runned timestamps
+ # Delete runned timestamps.
conn.del job_enqueued_key
- # delete jid_history
+ # Delete jid_history.
conn.del jid_history_key
- #delete main job
+ # Delete main job.
conn.del redis_key
end
+
Sidekiq.logger.info { "Cron Jobs - deleted job with name: #{@name}" }
end
- # remove all job from cron
+ # Remove all job from cron.
def self.destroy_all!
all.each do |job|
job.destroy
end
Sidekiq.logger.info { "Cron Jobs - deleted all jobs" }
end
- # remove "removed jobs" between current jobs and new jobs
+ # Remove "removed jobs" between current jobs and new jobs
def self.destroy_removed_jobs new_job_names
current_job_names = Sidekiq::Cron::Job.all.map(&:name)
removed_job_names = current_job_names - new_job_names
removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
removed_job_names
@@ -555,36 +564,47 @@
end
private
def parsed_cron
- @parsed_cron ||= Fugit.parse_cron(@cron)
+ @parsed_cron ||= begin
+ c = Fugit.parse(@cron)
+
+ # Since `Fugit.parse` might yield a Fugit::Duration or an EtOrbi::EoTime
+ # https://github.com/floraison/fugit#fugitparses
+ if c.is_a?(Fugit::Cron)
+ c
+ else
+ errors << "'cron' -> #{@cron.inspect} -> not a cron but a #{c.class}"
+ end
+ rescue => e
+ errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
+ end
end
def not_enqueued_after?(time)
@last_enqueue_time.nil? || @last_enqueue_time.to_i < last_time(time).to_i
end
# Try parsing inbound args into an array.
- # args from Redis will be encoded JSON;
- # try to load JSON, then failover
- # to string array.
+ # Args from Redis will be encoded JSON,
+ # try to load JSON, then failover to string array.
def parse_args(args)
case args
when String
begin
parsed_args = Sidekiq.load_json(args)
symbolize_args? ? symbolize_args(parsed_args) : parsed_args
rescue JSON::ParserError
- [*args] # cast to string array
+ [*args]
end
when Hash
symbolize_args? ? [symbolize_args(args)] : [args]
when Array
symbolize_args? ? symbolize_args(args) : args
else
- [*args] # cast to string array
+ [*args]
end
end
def symbolize_args?
@symbolize_args
@@ -612,55 +632,48 @@
DateTime.parse(timestamp).to_time.utc
end
def not_past_scheduled_time?(current_time)
last_cron_time = parsed_cron.previous_time(current_time).utc
- # or could it be?
- #last_cron_time = last_time(current_time)
return false if (current_time.to_i - last_cron_time.to_i) > 60
true
end
- # Redis key for set of all cron jobs
+ # Redis key for set of all cron jobs.
def self.jobs_key
"cron_jobs"
end
- # Redis key for storing one cron job
+ # Redis key for storing one cron job.
def self.redis_key name
"cron_job:#{name}"
end
- # Redis key for storing one cron job
+ # Redis key for storing one cron job.
def redis_key
self.class.redis_key @name
end
- # Redis key for storing one cron job run times
- # (when poller added job to queue)
+ # Redis key for storing one cron job run times (when poller added job to queue)
def self.job_enqueued_key name
"cron_job:#{name}:enqueued"
end
def self.jid_history_key name
"cron_job:#{name}:jid_history"
end
- # Redis key for storing one cron job run times
- # (when poller added job to queue)
def job_enqueued_key
self.class.job_enqueued_key @name
end
def jid_history_key
self.class.jid_history_key @name
end
- # Give Hash
- # returns array for using it for redis.hmset
+ # Give Hash returns array for using it for redis.hmset
def hash_to_redis hash
hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] }
end
-
end
end
end