lib/sidekiq/cron/job.rb in sidekiq-cron-1.6.0 vs lib/sidekiq/cron/job.rb in sidekiq-cron-1.7.0

- old
+ new

@@ -1,20 +1,23 @@ require 'fugit' require 'sidekiq' require 'sidekiq/cron/support' +require 'sidekiq/options' module Sidekiq module Cron class Job - #how long we would like to store informations about previous enqueues + # How long we would like to store informations about previous enqueues. REMEMBER_THRESHOLD = 24 * 60 * 60 + + # Time format for enqueued jobs. LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z' - # Use the exists? method if we're on a newer version of redis. + # Use the exists? method if we're on a newer version of Redis. REDIS_EXISTS_METHOD = Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists? - #crucial part of whole enquing job + # Crucial part of whole enqueuing job. def should_enque? time enqueue = false enqueue = Sidekiq.redis do |conn| status == "enabled" && not_past_scheduled_time?(time) && @@ -22,30 +25,28 @@ conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time)) end enqueue end - # remove previous informations about run times - # this will clear redis and make sure that redis will - # not overflow with memory + # Remove previous information about run times, + # this will clear Redis and make sure that Redis will not overflow with memory. def remove_previous_enques time Sidekiq.redis do |conn| conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}") end end - #test if job should be enqued If yes add it to queue + # Test if job should be enqueued. def test_and_enque_for_time! time - #should this job be enqued? if should_enque?(time) enque! remove_previous_enques(time) end end - #enque cron job to queue + # Enqueue cron job to queue. def enque! time = Time.now.utc @last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT) klass_const = begin @@ -78,21 +79,31 @@ @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base rescue NameError false end + def date_as_argument? + !!@date_as_argument + end + + def enqueue_args + date_as_argument? ? @args + [Time.now.to_f] : @args + end + def enqueue_active_job(klass_const) - klass_const.set(queue: @queue).perform_later(*@args) + klass_const.set(queue: @queue).perform_later(*enqueue_args) end def enqueue_sidekiq_worker(klass_const) - klass_const.set(queue: queue_name_with_prefix).perform_async(*@args) + klass_const.set(queue: queue_name_with_prefix).perform_async(*enqueue_args) end - # siodekiq worker message + # Sidekiq worker message. def sidekiq_worker_message - @message.is_a?(String) ? Sidekiq.load_json(@message) : @message + message = @message.is_a?(String) ? Sidekiq.load_json(@message) : @message + message["args"] = enqueue_args + message end def queue_name_with_prefix return @queue unless is_active_job? @@ -113,29 +124,29 @@ end queue_name end - # active job has different structure how it is loading data from sidekiq - # queue, it createaswrapper arround job + # Active Job has different structure how it is loading data from Sidekiq + # queue, it creates a wrapper around job. def active_job_message { 'class' => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper', 'wrapped' => @klass, 'queue' => @queue_name_with_prefix, 'description' => @description, 'args' => [{ 'job_class' => @klass, 'job_id' => SecureRandom.uuid, 'queue_name' => @queue_name_with_prefix, - 'arguments' => @args + 'arguments' => enqueue_args }] } end - # load cron jobs from Hash - # input structure should look like: + # Load cron jobs from Hash. + # Input structure should look like: # { # 'name_of_job' => { # 'class' => 'MyClass', # 'cron' => '1 * * * *', # 'args' => '(OPTIONAL) [Array or Hash]', @@ -153,19 +164,19 @@ out << job end load_from_array array end - # like to {#load_from_hash} - # If exists old jobs in redis but removed from args, destroy old jobs + # Like #load_from_hash. + # If exists old jobs in Redis but removed from args, destroy old jobs. def self.load_from_hash! hash destroy_removed_jobs(hash.keys) load_from_hash(hash) end - # load cron jobs from Array - # input structure should look like: + # Load cron jobs from Array. + # Input structure should look like: # [ # { # 'name' => 'name_of_job', # 'class' => 'MyClass', # 'cron' => '1 * * * *', @@ -186,19 +197,19 @@ errors[job.name] = job.errors unless job.save end errors end - # like to {#load_from_array} - # If exists old jobs in redis but removed from args, destroy old jobs + # Like #load_from_array. + # If exists old jobs in Redis but removed from args, destroy old jobs. def self.load_from_array! array job_names = array.map { |job| job["name"] } destroy_removed_jobs(job_names) load_from_array(array) end - # get all cron jobs + # Get all cron jobs. def self.all job_hashes = nil Sidekiq.redis do |conn| set_members = conn.smembers(jobs_key) job_hashes = conn.pipelined do |pipeline| @@ -206,11 +217,11 @@ pipeline.hgetall(key) end end end job_hashes.compact.reject(&:empty?).collect do |h| - # no need to fetch missing args from redis since we just got this hash from there + # No need to fetch missing args from Redis since we just got this hash from there Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false)) end end def self.count @@ -220,11 +231,11 @@ end out end def self.find name - #if name is hash try to get name from it + # If name is hash try to get name from it. name = name[:name] || name['name'] if name.is_a?(Hash) output = nil Sidekiq.redis do |conn| if exists? name @@ -232,18 +243,18 @@ end end output if output && output.valid? end - # create new instance of cron job + # Create new instance of cron job. def self.create hash new(hash).save end - #destroy job by name + # Destroy job by name. def self.destroy name - #if name is hash try to get name from it + # If name is hash try to get name from it. name = name[:name] || name['name'] if name.is_a?(Hash) if job = find(name) job.destroy else @@ -261,28 +272,29 @@ @name = args["name"] @cron = args["cron"] @description = args["description"] if args["description"] - #get class from klass or class + # Get class from klass or class. @klass = args["klass"] || args["class"] - #set status of job + # Set status of job. @status = args['status'] || status_from_redis - #set last enqueue time - from args or from existing job + # Set last enqueue time - from args or from existing job. if args['last_enqueue_time'] && !args['last_enqueue_time'].empty? @last_enqueue_time = parse_enqueue_time(args['last_enqueue_time']) else @last_enqueue_time = last_enqueue_time_from_redis end - #get right arguments for job + # Get right arguments for job. @symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @args = args["args"].nil? ? [] : parse_args( args["args"] ) - @args += [Time.now.to_f] if args["date_as_argument"] + @date_as_argument = args["date_as_argument"] == true || ("#{args["date_as_argument"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false + @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @active_job_queue_name_prefix = args["queue_name_prefix"] @active_job_queue_name_delimiter = args["queue_name_delimiter"] if args["message"] @@ -293,34 +305,34 @@ message_data = { "class" => @klass.to_s, "args" => @args, } - #get right data for message - #only if message wasn't specified before + # Get right data for message, + # only if message wasn't specified before. klass_data = case @klass when Class @klass.get_sidekiq_options when String begin Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options rescue Exception => e - #Unknown class + # Unknown class {"queue"=>"default"} end end message_data = klass_data.merge(message_data) - #override queue if setted in config - #only if message is hash - can be string (dumped JSON) + + # Override queue if setted in config, + # only if message is hash - can be string (dumped JSON). if args['queue'] @queue = message_data['queue'] = args['queue'] else @queue = message_data['queue'] || "default" end - #dump message as json @message = message_data end @queue_name_with_prefix = queue_name_with_prefix end @@ -378,24 +390,24 @@ out = Sidekiq.redis do |conn| conn.lrange(jid_history_key, 0, -1) rescue nil end - # returns nil if out nil out && out.map do |jid_history_raw| Sidekiq.load_json jid_history_raw end end - #export job data to hash + # Export job data to hash. def to_hash { name: @name, klass: @klass, cron: @cron, description: @description, args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []), + date_as_argument: @date_as_argument, message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}), status: @status, active_job: @active_job, queue_name_prefix: @active_job_queue_name_prefix, queue_name_delimiter: @active_job_queue_name_delimiter, @@ -407,19 +419,27 @@ def errors @errors ||= [] end def valid? - #clear previous errors + # Clear previous errors. @errors = [] errors << "'name' must be set" if @name.nil? || @name.size == 0 if @cron.nil? || @cron.size == 0 errors << "'cron' must be set" else begin - @parsed_cron = Fugit.do_parse_cron(@cron) + c = Fugit.do_parse(@cron) + + # Since `Fugit.do_parse` might yield a Fugit::Duration or an EtOrbi::EoTime + # https://github.com/floraison/fugit#fugitparses + if c.is_a?(Fugit::Cron) + @parsed_cron = c + else + errors << "'cron' -> #{@cron.inspect} -> not a cron but a #{c.class}" + end rescue => e errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}" end end @@ -436,89 +456,78 @@ @klass.size > 0 else end end - # add job to cron jobs - # input: - # name: (string) - name of job - # cron: (string: '* * * * *' - cron specification when to run job - # class: (string|class) - which class to perform - # optional input: - # queue: (string) - which queue to use for enquing (will override class queue) - # args: (array|hash|nil) - arguments for permorm method - def save - #if job is invalid return false + # If job is invalid, return false. return false unless valid? Sidekiq.redis do |conn| - #add to set of all jobs + # Add to set of all jobs conn.sadd self.class.jobs_key, redis_key - #add informations for this job! + # Add informations for this job! conn.hmset redis_key, *hash_to_redis(to_hash) - #add information about last time! - don't enque right after scheduler poller starts! + # Add information about last time! - don't enque right after scheduler poller starts! time = Time.now.utc conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key) end Sidekiq.logger.info { "Cron Jobs - added job with name: #{@name}" } end def save_last_enqueue_time Sidekiq.redis do |conn| - # update last enqueue time + # Update last enqueue time. conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time end end def add_jid_history(jid) jid_history = { jid: jid, enqueued: @last_enqueue_time } - cron_history_size = Sidekiq.respond_to?(:[]) ? Sidekiq[:cron_history_size] : Sidekiq.options[:cron_history_size] - @history_size ||= (cron_history_size || 10).to_i - 1 + + @history_size ||= (Sidekiq::Options[:cron_history_size] || 10).to_i - 1 Sidekiq.redis do |conn| conn.lpush jid_history_key, Sidekiq.dump_json(jid_history) - # keep only last 10 entries in a fifo manner + # Keep only last 10 entries in a fifo manner. conn.ltrim jid_history_key, 0, @history_size end end - # remove job from cron jobs by name - # input: - # first arg: name (string) - name of job (must be same - case sensitive) def destroy Sidekiq.redis do |conn| - #delete from set + # Delete from set. conn.srem self.class.jobs_key, redis_key - #delete runned timestamps + # Delete runned timestamps. conn.del job_enqueued_key - # delete jid_history + # Delete jid_history. conn.del jid_history_key - #delete main job + # Delete main job. conn.del redis_key end + Sidekiq.logger.info { "Cron Jobs - deleted job with name: #{@name}" } end - # remove all job from cron + # Remove all job from cron. def self.destroy_all! all.each do |job| job.destroy end Sidekiq.logger.info { "Cron Jobs - deleted all jobs" } end - # remove "removed jobs" between current jobs and new jobs + # Remove "removed jobs" between current jobs and new jobs def self.destroy_removed_jobs new_job_names current_job_names = Sidekiq::Cron::Job.all.map(&:name) removed_job_names = current_job_names - new_job_names removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) } removed_job_names @@ -555,36 +564,47 @@ end private def parsed_cron - @parsed_cron ||= Fugit.parse_cron(@cron) + @parsed_cron ||= begin + c = Fugit.parse(@cron) + + # Since `Fugit.parse` might yield a Fugit::Duration or an EtOrbi::EoTime + # https://github.com/floraison/fugit#fugitparses + if c.is_a?(Fugit::Cron) + c + else + errors << "'cron' -> #{@cron.inspect} -> not a cron but a #{c.class}" + end + rescue => e + errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}" + end end def not_enqueued_after?(time) @last_enqueue_time.nil? || @last_enqueue_time.to_i < last_time(time).to_i end # Try parsing inbound args into an array. - # args from Redis will be encoded JSON; - # try to load JSON, then failover - # to string array. + # Args from Redis will be encoded JSON, + # try to load JSON, then failover to string array. def parse_args(args) case args when String begin parsed_args = Sidekiq.load_json(args) symbolize_args? ? symbolize_args(parsed_args) : parsed_args rescue JSON::ParserError - [*args] # cast to string array + [*args] end when Hash symbolize_args? ? [symbolize_args(args)] : [args] when Array symbolize_args? ? symbolize_args(args) : args else - [*args] # cast to string array + [*args] end end def symbolize_args? @symbolize_args @@ -612,55 +632,48 @@ DateTime.parse(timestamp).to_time.utc end def not_past_scheduled_time?(current_time) last_cron_time = parsed_cron.previous_time(current_time).utc - # or could it be? - #last_cron_time = last_time(current_time) return false if (current_time.to_i - last_cron_time.to_i) > 60 true end - # Redis key for set of all cron jobs + # Redis key for set of all cron jobs. def self.jobs_key "cron_jobs" end - # Redis key for storing one cron job + # Redis key for storing one cron job. def self.redis_key name "cron_job:#{name}" end - # Redis key for storing one cron job + # Redis key for storing one cron job. def redis_key self.class.redis_key @name end - # Redis key for storing one cron job run times - # (when poller added job to queue) + # Redis key for storing one cron job run times (when poller added job to queue) def self.job_enqueued_key name "cron_job:#{name}:enqueued" end def self.jid_history_key name "cron_job:#{name}:jid_history" end - # Redis key for storing one cron job run times - # (when poller added job to queue) def job_enqueued_key self.class.job_enqueued_key @name end def jid_history_key self.class.jid_history_key @name end - # Give Hash - # returns array for using it for redis.hmset + # Give Hash returns array for using it for redis.hmset def hash_to_redis hash hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] } end - end end end