lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.5.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.6.0
- old
+ new
@@ -61,10 +61,28 @@
end
enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now,
klass, *args)
end
+ # Update the delayed timestamp of any matching delayed jobs or enqueue a
+ # new job if no matching jobs are found. Returns the number of delayed or
+ # enqueued jobs.
+ def delay_or_enqueue_at(timestamp, klass, *args)
+ count = remove_delayed(klass, *args)
+ count = 1 if count == 0
+
+ count.times do
+ enqueue_at(timestamp, klass, *args)
+ end
+ end
+
+ # Identical to +delay_or_enqueue_at+, except it takes
+ # number_of_seconds_from_now instead of a timestamp
+ def delay_or_enqueue_in(number_of_seconds_from_now, klass, *args)
+ delay_or_enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
+ end
+
# Used internally to stuff the item into the schedule sorted list.
# +timestamp+ can be either in seconds or a datetime object. The
# insertion time complexity is O(log(n)). Returns true if it's
# the first job to be scheduled at that time, else false.
def delayed_push(timestamp, item)
@@ -133,14 +151,11 @@
# Clears all jobs created with enqueue_at or enqueue_in
def reset_delayed_queue
Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
key = "delayed:#{item}"
items = redis.lrange(key, 0, -1)
- redis.pipelined do
- items.each { |ts_item| redis.del("timestamps:#{ts_item}") }
- end
- redis.del key
+ redis.del(key, items.map { |ts_item| "timestamps:#{ts_item}" })
end
redis.del :delayed_queue_schedule
end
@@ -215,13 +230,13 @@
timestamps = redis.zrange(:delayed_queue_schedule, 0, -1)
# Beyond 100 there's almost no improvement in speed
found = timestamps.each_slice(100).map do |ts_group|
- jobs = redis.pipelined do |r|
+ jobs = redis.pipelined do |pipeline|
ts_group.each do |ts|
- r.lrange("delayed:#{ts}", 0, -1)
+ pipeline.lrange("delayed:#{ts}", 0, -1)
end
end
jobs.flatten.select do |payload|
payload_matches_selection?(decode(payload), klass, &block)
@@ -297,14 +312,14 @@
def remove_delayed_job(encoded_job)
return 0 if Resque.inline?
timestamps = redis.smembers("timestamps:#{encoded_job}")
- replies = redis.pipelined do
+ replies = redis.pipelined do |pipeline|
timestamps.each do |key|
- redis.lrem(key, 0, encoded_job)
- redis.srem("timestamps:#{encoded_job}", key)
+ pipeline.lrem(key, 0, encoded_job)
+ pipeline.srem("timestamps:#{encoded_job}", key)
end
end
# timestamp key is not removed from the schedule, this is done later
# by the scheduler loop
@@ -317,12 +332,12 @@
# Use a watch here to ensure nobody adds jobs to this delayed
# queue while we're removing it.
redis.watch(key) do
if redis.llen(key).to_i == 0
# If the list is empty, remove it.
- redis.multi do
- redis.del(key)
- redis.zrem(:delayed_queue_schedule, timestamp.to_i)
+ redis.multi do |transaction|
+ transaction.del(key)
+ transaction.zrem(:delayed_queue_schedule, timestamp.to_i)
end
else
redis.redis.unwatch
end
end