lib/sidekiq/cron/job.rb in sidekiq-cron-0.4.2 vs lib/sidekiq/cron/job.rb in sidekiq-cron-0.4.3

- old
+ new

@@ -70,14 +70,22 @@ end # active job has different structure how it is loading data from sidekiq # queue, it createaswrapper arround job def active_job_message + if !"#{@active_job_queue_name_delimiter}".empty? + queue_name_delimiter = @active_job_queue_name_delimiter + elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty? + queue_name_delimiter = ActiveJob::Base.queue_name_delimiter + else + queue_name_delimiter = '_' + end + if !"#{@active_job_queue_name_prefix}".empty? - queue_name = "#{@active_job_queue_name_prefix}_#{@queue}" + queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}" elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty? - queue_name = "#{ActiveJob::Base.queue_name_prefix}_#{@queue}" + queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}" else queue_name = @queue end { @@ -157,21 +165,23 @@ load_from_array(array) end # get all cron jobs def self.all - out = [] + job_hashes = nil Sidekiq.redis do |conn| - out = conn.smembers(jobs_key).collect do |key| - if conn.exists key - Job.new conn.hgetall(key) - else - nil + set_members = conn.smembers(jobs_key) + job_hashes = conn.pipelined do + set_members.each do |key| + conn.hgetall(key) end end end - out.select{|j| !j.nil? } + job_hashes.compact.reject(&:empty?).collect do |h| + # no need to fetch missing args from redis since we just got this hash from there + Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false)) + end end def self.count out = 0 Sidekiq.redis do |conn| @@ -209,14 +219,16 @@ false end end attr_accessor :name, :cron, :description, :klass, :args, :message - attr_reader :last_enqueue_time + attr_reader :last_enqueue_time, :fetch_missing_args def initialize input_args = {} args = input_args.stringify_keys + @fetch_missing_args = args.delete('fetch_missing_args') + @fetch_missing_args = true if @fetch_missing_args.nil? @name = args["name"] @cron = args["cron"] @description = args["description"] if args["description"] @@ -236,10 +248,11 @@ #get right arguments for job @args = args["args"].nil? ? [] : parse_args( args["args"] ) @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false @active_job_queue_name_prefix = args["queue_name_prefix"] + @active_job_queue_name_delimiter = args["queue_name_delimiter"] if args["message"] @message = args["message"] message_data = Sidekiq.load_json(@message) || {} @queue = message_data['queue'] || default @@ -300,21 +313,22 @@ !enabled? end def status_from_redis out = "enabled" - if exists? + if fetch_missing_args Sidekiq.redis do |conn| - out = conn.hget redis_key, "status" + status = conn.hget redis_key, "status" + out = status if status end end out end def last_enqueue_time_from_redis out = nil - if exists? + if fetch_missing_args Sidekiq.redis do |conn| out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil end end out @@ -330,10 +344,11 @@ args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []), message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}), status: @status, active_job: @active_job, queue_name_prefix: @active_job_queue_name_prefix, + queue_name_delimiter: @active_job_queue_name_delimiter, last_enqueue_time: @last_enqueue_time, } end def errors @@ -397,10 +412,10 @@ #add informations for this job! conn.hmset redis_key, *hash_to_redis(to_hash) #add information about last time! - don't enque right after scheduler poller starts! time = Time.now - conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) + conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key) end logger.info { "Cron Jobs - add job with name: #{@name}" } end # remove job from cron jobs by name