lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.4.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.5.0

- old
+ new

@@ -22,20 +22,20 @@ # timestamp has passed. It respects Resque.inline option, by # creating the job right away instead of adding to the queue. def enqueue_at_with_queue(queue, timestamp, klass, *args) return false unless plugin.run_before_schedule_hooks(klass, *args) - if Resque.inline? || timestamp.to_i < Time.now.to_i + if Resque.inline? || timestamp.to_i <= Time.now.to_i # Just create the job and let resque perform it right away with # inline. If the class is a custom job class, call self#scheduled # on it. This allows you to do things like # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). # Otherwise, pass off to Resque. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass.to_s, *args) else - Resque::Job.create(queue, klass, *args) + Resque.enqueue_to(queue, klass, *args) end else delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) end @@ -62,13 +62,13 @@ enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, klass, *args) end # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object Insertion - # if O(log(n)). Returns true if it's the first job to be scheduled at - # that time, else false + # +timestamp+ can be either in seconds or a datetime object. The + # insertion time complexity is O(log(n)). Returns true if it's + # the first job to be scheduled at that time, else false. def delayed_push(timestamp, item) # First add this item to the list for this timestamp redis.rpush("delayed:#{timestamp.to_i}", encode(item)) # Store the timestamps at with this item occurs @@ -86,10 +86,11 @@ start + count - 1) Array(result).map(&:to_i) end # Returns the size of the delayed queue schedule + # this does not represent the number of items in the queue to be scheduled def delayed_queue_schedule_size redis.zcard :delayed_queue_schedule end # Returns the number of jobs for a given timestamp in the delayed queue @@ -147,18 +148,30 @@ def remove_delayed(klass, *args) search = encode(job_to_hash(klass, args)) remove_delayed_job(search) end + def remove_delayed_in_queue(klass, queue, *args) + search = encode(job_to_hash_with_queue(queue, klass, args)) + remove_delayed_job(search) + end + # Given an encoded item, enqueue it now def enqueue_delayed(klass, *args) hash = job_to_hash(klass, args) remove_delayed(klass, *args).times do Resque::Scheduler.enqueue_from_config(hash) end end + def enqueue_delayed_with_queue(klass, queue, *args) + hash = job_to_hash_with_queue(queue, klass, args) + remove_delayed_in_queue(klass, queue, *args).times do + Resque::Scheduler.enqueue_from_config(hash) + end + end + # Given a block, remove jobs that return true from a block # # This allows for removal of delayed jobs that have arguments matching # certain criteria def remove_delayed_selection(klass = nil) @@ -179,11 +192,19 @@ found_jobs = find_delayed_selection(klass) { |args| yield(args) } found_jobs.reduce(0) do |sum, encoded_job| decoded_job = decode(encoded_job) klass = Util.constantize(decoded_job['class']) - sum + enqueue_delayed(klass, *decoded_job['args']) + queue = decoded_job['queue'] + + if queue + jobs_queued = enqueue_delayed_with_queue(klass, queue, *decoded_job['args']) + else + jobs_queued = enqueue_delayed(klass, *decoded_job['args']) + end + + jobs_queued + sum end end # Given a block, find jobs that return true from a block # @@ -269,10 +290,12 @@ def job_to_hash_with_queue(queue, klass, args) { class: klass.to_s, args: args, queue: queue } end + # Removes a job from the queue, but not modify the timestamp schedule. This method + # will not effect the output of `delayed_queue_schedule_size` def remove_delayed_job(encoded_job) return 0 if Resque.inline? timestamps = redis.smembers("timestamps:#{encoded_job}") @@ -280,9 +303,12 @@ timestamps.each do |key| redis.lrem(key, 0, encoded_job) redis.srem("timestamps:#{encoded_job}", key) end end + + # timestamp key is not removed from the schedule, this is done later + # by the scheduler loop return 0 if replies.nil? || replies.empty? replies.each_slice(2).map(&:first).inject(:+) end