lib/sidekiq/cron/job.rb in sidekiq-cron-2.0.0.rc1 vs lib/sidekiq/cron/job.rb in sidekiq-cron-2.0.0.rc2
- old
+ new
@@ -1,26 +1,21 @@
# frozen_string_literal: true
require 'fugit'
require 'cronex'
require 'globalid'
-require 'sidekiq'
require 'sidekiq/cron/support'
-require 'sidekiq/options'
module Sidekiq
module Cron
class Job
# How long we would like to store information about previous enqueues.
REMEMBER_THRESHOLD = 24 * 60 * 60
# Time format for enqueued jobs.
LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z'
- # Use the exists? method if we're on a newer version of Redis.
- REDIS_EXISTS_METHOD = Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new("7.0.0") || Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists?
-
# Use serialize/deserialize key of GlobalID.
GLOBALID_KEY = "_sc_globalid"
attr_accessor :name, :namespace, :cron, :description, :klass, :args, :message
attr_reader :last_enqueue_time, :fetch_missing_args, :source
@@ -90,11 +85,11 @@
@queue_name_with_prefix = queue_name_with_prefix
end
# Crucial part of whole enqueuing job.
- def should_enque? time
+ def should_enqueue? time
return false unless status == "enabled"
return false if past_scheduled_time?(time)
return false if enqueued_after?(time)
enqueue = Sidekiq.redis do |conn|
@@ -103,27 +98,27 @@
enqueue == true || enqueue == 1
end
# Remove previous information about run times,
# this will clear Redis and make sure that Redis will not overflow with memory.
- def remove_previous_enques time
+ def remove_previous_enqueues time
Sidekiq.redis do |conn|
conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
end
end
# Test if job should be enqueued.
- def test_and_enque_for_time! time
- if should_enque?(time)
- enque!
+ def test_and_enqueue_for_time! time
+ if should_enqueue?(time)
+ enqueue!
- remove_previous_enques(time)
+ remove_previous_enqueues(time)
end
end
# Enqueue cron job to queue.
- def enque! time = Time.now.utc
+ def enqueue! time = Time.now.utc
@last_enqueue_time = time
klass_const =
begin
Sidekiq::Cron::Support.constantize(@klass.to_s)
@@ -411,32 +406,27 @@
end
end
# Export job data to hash.
def to_hash
- hash = {
+ {
name: @name,
namespace: @namespace,
klass: @klass.to_s,
cron: @cron,
description: @description,
source: @source,
args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
+ date_as_argument: date_as_argument? ? "1" : "0",
message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
status: @status,
active_job: @active_job ? "1" : "0",
queue_name_prefix: @active_job_queue_name_prefix,
queue_name_delimiter: @active_job_queue_name_delimiter,
last_enqueue_time: serialized_last_enqueue_time,
symbolize_args: symbolize_args? ? "1" : "0",
}
-
- if date_as_argument?
- hash.merge!(date_as_argument: "1")
- end
-
- hash
end
def errors
@errors ||= []
end
@@ -445,10 +435,11 @@
# Clear previous errors.
@errors = []
errors << "'name' must be set" if @name.nil? || @name.size == 0
errors << "'namespace' must be set" if @namespace.nil? || @namespace.size == 0
+ errors << "'namespace' cannot be '*'" if @namespace == "*"
if @cron.nil? || @cron.size == 0
errors << "'cron' must be set"
else
begin
@@ -480,18 +471,23 @@
Sidekiq.redis do |conn|
# Add to set of all jobs
conn.sadd self.class.jobs_key(@namespace), [redis_key]
# Add information for this job!
- conn.hset redis_key, to_hash.transform_values! { |v| v || '' }
+ conn.hset redis_key, to_hash.transform_values! { |v| v || '' }.flatten
- # Add information about last time! - don't enque right after scheduler poller starts!
+ # Add information about last time! - don't enqueue right after scheduler poller starts!
time = Time.now.utc
- exists = conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key)
- conn.zadd(job_enqueued_key, time.to_f.to_s, formatted_last_time(time).to_s) unless exists == true || exists == 1
+ exists = conn.exists(job_enqueued_key)
+
+ unless exists == true || exists == 1
+ conn.zadd(job_enqueued_key, time.to_f.to_s, formatted_last_time(time).to_s)
+ Sidekiq.logger.info { "Cron Jobs - added job with name #{@name} in the namespace #{@namespace}" }
+ end
end
- Sidekiq.logger.info { "Cron Jobs - added job with name #{@name} in the namespace #{@namespace}" }
+
+ true
end
def save_last_enqueue_time
Sidekiq.redis do |conn|
# Update last enqueue time.
@@ -503,11 +499,11 @@
jid_history = {
jid: jid,
enqueued: @last_enqueue_time
}
- @history_size ||= (Sidekiq::Options[:cron_history_size] || 10).to_i - 1
+ @history_size ||= Sidekiq::Cron.configuration.cron_history_size.to_i - 1
Sidekiq.redis do |conn|
conn.lpush jid_history_key,
Sidekiq.dump_json(jid_history)
# Keep only last 10 entries in a fifo manner.
conn.ltrim jid_history_key, 0, @history_size
@@ -570,10 +566,10 @@
last_time(now).getutc.iso8601
end
def self.exists?(name, namespace = Sidekiq::Cron.configuration.default_namespace)
out = Sidekiq.redis do |conn|
- conn.public_send(REDIS_EXISTS_METHOD, redis_key(name, namespace))
+ conn.exists(redis_key(name, namespace))
end
[true, 1].include?(out)
end