lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.5.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.6.0

- old
+ new

@@ -61,10 +61,28 @@ end enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, klass, *args) end + # Update the delayed timestamp of any matching delayed jobs or enqueue a + # new job if no matching jobs are found. Returns the number of delayed or + # enqueued jobs. + def delay_or_enqueue_at(timestamp, klass, *args) + count = remove_delayed(klass, *args) + count = 1 if count == 0 + + count.times do + enqueue_at(timestamp, klass, *args) + end + end + + # Identical to +delay_or_enqueue_at+, except it takes + # number_of_seconds_from_now instead of a timestamp + def delay_or_enqueue_in(number_of_seconds_from_now, klass, *args) + delay_or_enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + # Used internally to stuff the item into the schedule sorted list. # +timestamp+ can be either in seconds or a datetime object. The # insertion time complexity is O(log(n)). Returns true if it's # the first job to be scheduled at that time, else false. def delayed_push(timestamp, item) @@ -133,14 +151,11 @@ # Clears all jobs created with enqueue_at or enqueue_in def reset_delayed_queue Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| key = "delayed:#{item}" items = redis.lrange(key, 0, -1) - redis.pipelined do - items.each { |ts_item| redis.del("timestamps:#{ts_item}") } - end - redis.del key + redis.del(key, items.map { |ts_item| "timestamps:#{ts_item}" }) end redis.del :delayed_queue_schedule end @@ -215,13 +230,13 @@ timestamps = redis.zrange(:delayed_queue_schedule, 0, -1) # Beyond 100 there's almost no improvement in speed found = timestamps.each_slice(100).map do |ts_group| - jobs = redis.pipelined do |r| + jobs = redis.pipelined do |pipeline| ts_group.each do |ts| - r.lrange("delayed:#{ts}", 0, -1) + pipeline.lrange("delayed:#{ts}", 0, -1) end end jobs.flatten.select do |payload| payload_matches_selection?(decode(payload), klass, &block) @@ -297,14 +312,14 @@ def remove_delayed_job(encoded_job) return 0 if Resque.inline? timestamps = redis.smembers("timestamps:#{encoded_job}") - replies = redis.pipelined do + replies = redis.pipelined do |pipeline| timestamps.each do |key| - redis.lrem(key, 0, encoded_job) - redis.srem("timestamps:#{encoded_job}", key) + pipeline.lrem(key, 0, encoded_job) + pipeline.srem("timestamps:#{encoded_job}", key) end end # timestamp key is not removed from the schedule, this is done later # by the scheduler loop @@ -317,12 +332,12 @@ # Use a watch here to ensure nobody adds jobs to this delayed # queue while we're removing it. redis.watch(key) do if redis.llen(key).to_i == 0 # If the list is empty, remove it. - redis.multi do - redis.del(key) - redis.zrem(:delayed_queue_schedule, timestamp.to_i) + redis.multi do |transaction| + transaction.del(key) + transaction.zrem(:delayed_queue_schedule, timestamp.to_i) end else redis.redis.unwatch end end