lib/sidekiq/cron/job.rb in sidekiq-cron-2.0.0.rc1 vs lib/sidekiq/cron/job.rb in sidekiq-cron-2.0.0.rc2

- old
+ new

@@ -1,26 +1,21 @@ # frozen_string_literal: true require 'fugit' require 'cronex' require 'globalid' -require 'sidekiq' require 'sidekiq/cron/support' -require 'sidekiq/options' module Sidekiq module Cron class Job # How long we would like to store information about previous enqueues. REMEMBER_THRESHOLD = 24 * 60 * 60 # Time format for enqueued jobs. LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z' - # Use the exists? method if we're on a newer version of Redis. - REDIS_EXISTS_METHOD = Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new("7.0.0") || Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists? - # Use serialize/deserialize key of GlobalID. GLOBALID_KEY = "_sc_globalid" attr_accessor :name, :namespace, :cron, :description, :klass, :args, :message attr_reader :last_enqueue_time, :fetch_missing_args, :source @@ -90,11 +85,11 @@ @queue_name_with_prefix = queue_name_with_prefix end # Crucial part of whole enqueuing job. - def should_enque? time + def should_enqueue? time return false unless status == "enabled" return false if past_scheduled_time?(time) return false if enqueued_after?(time) enqueue = Sidekiq.redis do |conn| @@ -103,27 +98,27 @@ enqueue == true || enqueue == 1 end # Remove previous information about run times, # this will clear Redis and make sure that Redis will not overflow with memory. - def remove_previous_enques time + def remove_previous_enqueues time Sidekiq.redis do |conn| conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}") end end # Test if job should be enqueued. - def test_and_enque_for_time! time - if should_enque?(time) - enque! + def test_and_enqueue_for_time! time + if should_enqueue?(time) + enqueue! - remove_previous_enques(time) + remove_previous_enqueues(time) end end # Enqueue cron job to queue. - def enque! time = Time.now.utc + def enqueue! time = Time.now.utc @last_enqueue_time = time klass_const = begin Sidekiq::Cron::Support.constantize(@klass.to_s) @@ -411,32 +406,27 @@ end end # Export job data to hash. def to_hash - hash = { + { name: @name, namespace: @namespace, klass: @klass.to_s, cron: @cron, description: @description, source: @source, args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []), + date_as_argument: date_as_argument? ? "1" : "0", message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}), status: @status, active_job: @active_job ? "1" : "0", queue_name_prefix: @active_job_queue_name_prefix, queue_name_delimiter: @active_job_queue_name_delimiter, last_enqueue_time: serialized_last_enqueue_time, symbolize_args: symbolize_args? ? "1" : "0", } - - if date_as_argument? - hash.merge!(date_as_argument: "1") - end - - hash end def errors @errors ||= [] end @@ -445,10 +435,11 @@ # Clear previous errors. @errors = [] errors << "'name' must be set" if @name.nil? || @name.size == 0 errors << "'namespace' must be set" if @namespace.nil? || @namespace.size == 0 + errors << "'namespace' cannot be '*'" if @namespace == "*" if @cron.nil? || @cron.size == 0 errors << "'cron' must be set" else begin @@ -480,18 +471,23 @@ Sidekiq.redis do |conn| # Add to set of all jobs conn.sadd self.class.jobs_key(@namespace), [redis_key] # Add information for this job! - conn.hset redis_key, to_hash.transform_values! { |v| v || '' } + conn.hset redis_key, to_hash.transform_values! { |v| v || '' }.flatten - # Add information about last time! - don't enque right after scheduler poller starts! + # Add information about last time! - don't enqueue right after scheduler poller starts! time = Time.now.utc - exists = conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key) - conn.zadd(job_enqueued_key, time.to_f.to_s, formatted_last_time(time).to_s) unless exists == true || exists == 1 + exists = conn.exists(job_enqueued_key) + + unless exists == true || exists == 1 + conn.zadd(job_enqueued_key, time.to_f.to_s, formatted_last_time(time).to_s) + Sidekiq.logger.info { "Cron Jobs - added job with name #{@name} in the namespace #{@namespace}" } + end end - Sidekiq.logger.info { "Cron Jobs - added job with name #{@name} in the namespace #{@namespace}" } + + true end def save_last_enqueue_time Sidekiq.redis do |conn| # Update last enqueue time. @@ -503,11 +499,11 @@ jid_history = { jid: jid, enqueued: @last_enqueue_time } - @history_size ||= (Sidekiq::Options[:cron_history_size] || 10).to_i - 1 + @history_size ||= Sidekiq::Cron.configuration.cron_history_size.to_i - 1 Sidekiq.redis do |conn| conn.lpush jid_history_key, Sidekiq.dump_json(jid_history) # Keep only last 10 entries in a fifo manner. conn.ltrim jid_history_key, 0, @history_size @@ -570,10 +566,10 @@ last_time(now).getutc.iso8601 end def self.exists?(name, namespace = Sidekiq::Cron.configuration.default_namespace) out = Sidekiq.redis do |conn| - conn.public_send(REDIS_EXISTS_METHOD, redis_key(name, namespace)) + conn.exists(redis_key(name, namespace)) end [true, 1].include?(out) end