lib/sidekiq/cron/job.rb in sidekiq-cron-0.4.2 vs lib/sidekiq/cron/job.rb in sidekiq-cron-0.4.3
- old
+ new
@@ -70,14 +70,22 @@
end
# active job has different structure how it is loading data from sidekiq
# queue, it createaswrapper arround job
def active_job_message
+ if !"#{@active_job_queue_name_delimiter}".empty?
+ queue_name_delimiter = @active_job_queue_name_delimiter
+ elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
+ queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
+ else
+ queue_name_delimiter = '_'
+ end
+
if !"#{@active_job_queue_name_prefix}".empty?
- queue_name = "#{@active_job_queue_name_prefix}_#{@queue}"
+ queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
- queue_name = "#{ActiveJob::Base.queue_name_prefix}_#{@queue}"
+ queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
else
queue_name = @queue
end
{
@@ -157,21 +165,23 @@
load_from_array(array)
end
# get all cron jobs
def self.all
- out = []
+ job_hashes = nil
Sidekiq.redis do |conn|
- out = conn.smembers(jobs_key).collect do |key|
- if conn.exists key
- Job.new conn.hgetall(key)
- else
- nil
+ set_members = conn.smembers(jobs_key)
+ job_hashes = conn.pipelined do
+ set_members.each do |key|
+ conn.hgetall(key)
end
end
end
- out.select{|j| !j.nil? }
+ job_hashes.compact.reject(&:empty?).collect do |h|
+ # no need to fetch missing args from redis since we just got this hash from there
+ Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
+ end
end
def self.count
out = 0
Sidekiq.redis do |conn|
@@ -209,14 +219,16 @@
false
end
end
attr_accessor :name, :cron, :description, :klass, :args, :message
- attr_reader :last_enqueue_time
+ attr_reader :last_enqueue_time, :fetch_missing_args
def initialize input_args = {}
args = input_args.stringify_keys
+ @fetch_missing_args = args.delete('fetch_missing_args')
+ @fetch_missing_args = true if @fetch_missing_args.nil?
@name = args["name"]
@cron = args["cron"]
@description = args["description"] if args["description"]
@@ -236,10 +248,11 @@
#get right arguments for job
@args = args["args"].nil? ? [] : parse_args( args["args"] )
@active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@active_job_queue_name_prefix = args["queue_name_prefix"]
+ @active_job_queue_name_delimiter = args["queue_name_delimiter"]
if args["message"]
@message = args["message"]
message_data = Sidekiq.load_json(@message) || {}
@queue = message_data['queue'] || default
@@ -300,21 +313,22 @@
!enabled?
end
def status_from_redis
out = "enabled"
- if exists?
+ if fetch_missing_args
Sidekiq.redis do |conn|
- out = conn.hget redis_key, "status"
+ status = conn.hget redis_key, "status"
+ out = status if status
end
end
out
end
def last_enqueue_time_from_redis
out = nil
- if exists?
+ if fetch_missing_args
Sidekiq.redis do |conn|
out = Time.parse(conn.hget(redis_key, "last_enqueue_time")) rescue nil
end
end
out
@@ -330,10 +344,11 @@
args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
status: @status,
active_job: @active_job,
queue_name_prefix: @active_job_queue_name_prefix,
+ queue_name_delimiter: @active_job_queue_name_delimiter,
last_enqueue_time: @last_enqueue_time,
}
end
def errors
@@ -397,10 +412,10 @@
#add informations for this job!
conn.hmset redis_key, *hash_to_redis(to_hash)
#add information about last time! - don't enque right after scheduler poller starts!
time = Time.now
- conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s)
+ conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.exists(job_enqueued_key)
end
logger.info { "Cron Jobs - add job with name: #{@name}" }
end
# remove job from cron jobs by name