lib/sidekiq/cron/job.rb in sidekiq-cron-1.2.0 vs lib/sidekiq/cron/job.rb in sidekiq-cron-1.3.0

- old
+ new

@@ -12,10 +12,13 @@ #how long we would like to store informations about previous enqueues REMEMBER_THRESHOLD = 24 * 60 * 60 LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z' + # Use the exists? method if we're on a newer version of redis. + REDIS_EXISTS_METHOD = Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists? + #crucial part of whole enquing job def should_enque? time enqueue = false enqueue = Sidekiq.redis do |conn| status == "enabled" && @@ -201,13 +204,13 @@ # get all cron jobs def self.all job_hashes = nil Sidekiq.redis do |conn| set_members = conn.smembers(jobs_key) - job_hashes = conn.pipelined do + job_hashes = conn.pipelined do |pipeline| set_members.each do |key| - conn.hgetall(key) + pipeline.hgetall(key) end end end job_hashes.compact.reject(&:empty?).collect do |h| # no need to fetch missing args from redis since we just got this hash from there @@ -231,11 +234,11 @@ Sidekiq.redis do |conn| if exists? name output = Job.new conn.hgetall( redis_key(name) ) end end - output + output if output && output.valid? end # create new instance of cron job def self.create hash new(hash).save @@ -277,10 +280,11 @@ else @last_enqueue_time = last_enqueue_time_from_redis end #get right arguments for job + @symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @args = args["args"].nil? ? [] : parse_args( args["args"] ) @args += [Time.now.to_f] if args["date_as_argument"] @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @active_job_queue_name_prefix = args["queue_name_prefix"] @@ -399,10 +403,11 @@ status: @status, active_job: @active_job, queue_name_prefix: @active_job_queue_name_prefix, queue_name_delimiter: @active_job_queue_name_delimiter, last_enqueue_time: @last_enqueue_time, + symbolize_args: @symbolize_args, } end def errors @errors ||= [] @@ -459,11 +464,11 @@ #add informations for this job! conn.hmset redis_key, *hash_to_redis(to_hash) #add information about last time! - don't enque right after scheduler poller starts! time = Time.now.utc - conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key) + conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key) end logger.info { "Cron Jobs - add job with name: #{@name}" } end def save_last_enqueue_time @@ -538,11 +543,11 @@ end def self.exists? name out = false Sidekiq.redis do |conn| - out = conn.exists redis_key name + out = conn.public_send(REDIS_EXISTS_METHOD, redis_key(name)) end out end def exists? @@ -569,19 +574,40 @@ # to string array. def parse_args(args) case args when String begin - Sidekiq.load_json(args) + parsed_args = Sidekiq.load_json(args) + symbolize_args? ? symbolize_args(parsed_args) : parsed_args rescue JSON::ParserError [*args] # cast to string array end when Hash - [args] # just put hash into array + symbolize_args? ? [symbolize_args(args)] : [args] when Array - args # do nothing, already array + symbolize_args? ? symbolize_args(args) : args else [*args] # cast to string array + end + end + + def symbolize_args? + @symbolize_args + end + + def symbolize_args(input) + if input.is_a?(Array) + input.map do |arg| + if arg.respond_to?(:symbolize_keys) + arg.symbolize_keys + else + arg + end + end + elsif input.is_a?(Hash) && input.respond_to?(:symbolize_keys) + input.symbolize_keys + else + input end end def parse_enqueue_time(timestamp) DateTime.strptime(timestamp, LAST_ENQUEUE_TIME_FORMAT).to_time.utc