lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.4.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.5.0
- old
+ new
@@ -22,20 +22,20 @@
# timestamp has passed. It respects Resque.inline option, by
# creating the job right away instead of adding to the queue.
def enqueue_at_with_queue(queue, timestamp, klass, *args)
return false unless plugin.run_before_schedule_hooks(klass, *args)
- if Resque.inline? || timestamp.to_i < Time.now.to_i
+ if Resque.inline? || timestamp.to_i <= Time.now.to_i
# Just create the job and let resque perform it right away with
# inline. If the class is a custom job class, call self#scheduled
# on it. This allows you to do things like
# Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1).
# Otherwise, pass off to Resque.
if klass.respond_to?(:scheduled)
klass.scheduled(queue, klass.to_s, *args)
else
- Resque::Job.create(queue, klass, *args)
+ Resque.enqueue_to(queue, klass, *args)
end
else
delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
end
@@ -62,13 +62,13 @@
enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now,
klass, *args)
end
# Used internally to stuff the item into the schedule sorted list.
- # +timestamp+ can be either in seconds or a datetime object Insertion
- # if O(log(n)). Returns true if it's the first job to be scheduled at
- # that time, else false
+ # +timestamp+ can be either in seconds or a datetime object. The
+ # insertion time complexity is O(log(n)). Returns true if it's
+ # the first job to be scheduled at that time, else false.
def delayed_push(timestamp, item)
# First add this item to the list for this timestamp
redis.rpush("delayed:#{timestamp.to_i}", encode(item))
# Store the timestamps at with this item occurs
@@ -86,10 +86,11 @@
start + count - 1)
Array(result).map(&:to_i)
end
# Returns the size of the delayed queue schedule
+ # this does not represent the number of items in the queue to be scheduled
def delayed_queue_schedule_size
redis.zcard :delayed_queue_schedule
end
# Returns the number of jobs for a given timestamp in the delayed queue
@@ -147,18 +148,30 @@
def remove_delayed(klass, *args)
search = encode(job_to_hash(klass, args))
remove_delayed_job(search)
end
+ def remove_delayed_in_queue(klass, queue, *args)
+ search = encode(job_to_hash_with_queue(queue, klass, args))
+ remove_delayed_job(search)
+ end
+
# Given an encoded item, enqueue it now
def enqueue_delayed(klass, *args)
hash = job_to_hash(klass, args)
remove_delayed(klass, *args).times do
Resque::Scheduler.enqueue_from_config(hash)
end
end
+ def enqueue_delayed_with_queue(klass, queue, *args)
+ hash = job_to_hash_with_queue(queue, klass, args)
+ remove_delayed_in_queue(klass, queue, *args).times do
+ Resque::Scheduler.enqueue_from_config(hash)
+ end
+ end
+
# Given a block, remove jobs that return true from a block
#
# This allows for removal of delayed jobs that have arguments matching
# certain criteria
def remove_delayed_selection(klass = nil)
@@ -179,11 +192,19 @@
found_jobs = find_delayed_selection(klass) { |args| yield(args) }
found_jobs.reduce(0) do |sum, encoded_job|
decoded_job = decode(encoded_job)
klass = Util.constantize(decoded_job['class'])
- sum + enqueue_delayed(klass, *decoded_job['args'])
+ queue = decoded_job['queue']
+
+ if queue
+ jobs_queued = enqueue_delayed_with_queue(klass, queue, *decoded_job['args'])
+ else
+ jobs_queued = enqueue_delayed(klass, *decoded_job['args'])
+ end
+
+ jobs_queued + sum
end
end
# Given a block, find jobs that return true from a block
#
@@ -269,10 +290,12 @@
def job_to_hash_with_queue(queue, klass, args)
{ class: klass.to_s, args: args, queue: queue }
end
+ # Removes a job from the queue, but not modify the timestamp schedule. This method
+ # will not effect the output of `delayed_queue_schedule_size`
def remove_delayed_job(encoded_job)
return 0 if Resque.inline?
timestamps = redis.smembers("timestamps:#{encoded_job}")
@@ -280,9 +303,12 @@
timestamps.each do |key|
redis.lrem(key, 0, encoded_job)
redis.srem("timestamps:#{encoded_job}", key)
end
end
+
+ # timestamp key is not removed from the schedule, this is done later
+ # by the scheduler loop
return 0 if replies.nil? || replies.empty?
replies.each_slice(2).map(&:first).inject(:+)
end