lib/sidekiq/cron/job.rb in sidekiq-cron-1.2.0 vs lib/sidekiq/cron/job.rb in sidekiq-cron-1.3.0
- old
+ new
@@ -12,10 +12,13 @@
#how long we would like to store informations about previous enqueues
REMEMBER_THRESHOLD = 24 * 60 * 60
LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z'
+ # Use the exists? method if we're on a newer version of redis.
+ REDIS_EXISTS_METHOD = Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists?
+
#crucial part of whole enquing job
def should_enque? time
enqueue = false
enqueue = Sidekiq.redis do |conn|
status == "enabled" &&
@@ -201,13 +204,13 @@
# get all cron jobs
def self.all
job_hashes = nil
Sidekiq.redis do |conn|
set_members = conn.smembers(jobs_key)
- job_hashes = conn.pipelined do
+ job_hashes = conn.pipelined do |pipeline|
set_members.each do |key|
- conn.hgetall(key)
+ pipeline.hgetall(key)
end
end
end
job_hashes.compact.reject(&:empty?).collect do |h|
# no need to fetch missing args from redis since we just got this hash from there
@@ -231,11 +234,11 @@
Sidekiq.redis do |conn|
if exists? name
output = Job.new conn.hgetall( redis_key(name) )
end
end
- output
+ output if output && output.valid?
end
# create new instance of cron job
def self.create hash
new(hash).save
@@ -277,10 +280,11 @@
else
@last_enqueue_time = last_enqueue_time_from_redis
end
#get right arguments for job
+ @symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@args = args["args"].nil? ? [] : parse_args( args["args"] )
@args += [Time.now.to_f] if args["date_as_argument"]
@active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@active_job_queue_name_prefix = args["queue_name_prefix"]
@@ -399,10 +403,11 @@
status: @status,
active_job: @active_job,
queue_name_prefix: @active_job_queue_name_prefix,
queue_name_delimiter: @active_job_queue_name_delimiter,
last_enqueue_time: @last_enqueue_time,
+ symbolize_args: @symbolize_args,
}
end
def errors
@errors ||= []
@@ -459,11 +464,11 @@
#add informations for this job!
conn.hmset redis_key, *hash_to_redis(to_hash)
#add information about last time! - don't enque right after scheduler poller starts!
time = Time.now.utc
- conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
+ conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key)
end
logger.info { "Cron Jobs - add job with name: #{@name}" }
end
def save_last_enqueue_time
@@ -538,11 +543,11 @@
end
def self.exists? name
out = false
Sidekiq.redis do |conn|
- out = conn.exists redis_key name
+ out = conn.public_send(REDIS_EXISTS_METHOD, redis_key(name))
end
out
end
def exists?
@@ -569,19 +574,40 @@
# to string array.
def parse_args(args)
case args
when String
begin
- Sidekiq.load_json(args)
+ parsed_args = Sidekiq.load_json(args)
+ symbolize_args? ? symbolize_args(parsed_args) : parsed_args
rescue JSON::ParserError
[*args] # cast to string array
end
when Hash
- [args] # just put hash into array
+ symbolize_args? ? [symbolize_args(args)] : [args]
when Array
- args # do nothing, already array
+ symbolize_args? ? symbolize_args(args) : args
else
[*args] # cast to string array
+ end
+ end
+
+ def symbolize_args?
+ @symbolize_args
+ end
+
+ def symbolize_args(input)
+ if input.is_a?(Array)
+ input.map do |arg|
+ if arg.respond_to?(:symbolize_keys)
+ arg.symbolize_keys
+ else
+ arg
+ end
+ end
+ elsif input.is_a?(Hash) && input.respond_to?(:symbolize_keys)
+ input.symbolize_keys
+ else
+ input
end
end
def parse_enqueue_time(timestamp)
DateTime.strptime(timestamp, LAST_ENQUEUE_TIME_FORMAT).to_time.utc