lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.8.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.9.0
- old
+ new
@@ -88,11 +88,11 @@
def delayed_push(timestamp, item)
# First add this item to the list for this timestamp
redis.rpush("delayed:#{timestamp.to_i}", encode(item))
# Store the timestamps at with this item occurs
- redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")
+ redis.sadd("timestamps:#{encode(item)}", ["delayed:#{timestamp.to_i}"])
# Now, add this timestamp to the zsets. The score and the value are
# the same since we'll be querying by timestamp, and we don't have
# anything else to store.
redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
@@ -138,11 +138,11 @@
# +timestamp+ can either be in seconds or a datetime
def next_item_for_timestamp(timestamp)
key = "delayed:#{timestamp.to_i}"
encoded_item = redis.lpop(key)
- redis.srem("timestamps:#{encoded_item}", key)
+ redis.srem("timestamps:#{encoded_item}", [key])
item = decode(encoded_item)
# If the list is empty, remove it.
clean_up_timestamp(key, timestamp)
item
@@ -255,11 +255,11 @@
return 0 if Resque.inline?
key = "delayed:#{timestamp.to_i}"
encoded_job = encode(job_to_hash(klass, args))
- redis.srem("timestamps:#{encoded_job}", key)
+ redis.srem("timestamps:#{encoded_job}", [key])
count = redis.lrem(key, 0, encoded_job)
clean_up_timestamp(key, timestamp)
count
end
@@ -295,10 +295,26 @@
def get_last_enqueued_at(job_name)
redis.hget('delayed:last_enqueued_at', job_name)
end
+ def clean_up_timestamp(key, timestamp)
+ # Use a watch here to ensure nobody adds jobs to this delayed
+ # queue while we're removing it.
+ redis.watch(key) do
+ if redis.llen(key).to_i == 0
+ # If the list is empty, remove it.
+ redis.multi do |transaction|
+ transaction.del(key)
+ transaction.zrem(:delayed_queue_schedule, timestamp.to_i)
+ end
+ else
+ redis.redis.unwatch
+ end
+ end
+ end
+
private
def job_to_hash(klass, args)
{ class: klass.to_s, args: args, queue: queue_from_class(klass) }
end
@@ -315,34 +331,18 @@
timestamps = redis.smembers("timestamps:#{encoded_job}")
replies = redis.pipelined do |pipeline|
timestamps.each do |key|
pipeline.lrem(key, 0, encoded_job)
- pipeline.srem("timestamps:#{encoded_job}", key)
+ pipeline.srem("timestamps:#{encoded_job}", [key])
end
end
# timestamp key is not removed from the schedule, this is done later
# by the scheduler loop
return 0 if replies.nil? || replies.empty?
replies.each_slice(2).map(&:first).inject(:+)
- end
-
- def clean_up_timestamp(key, timestamp)
- # Use a watch here to ensure nobody adds jobs to this delayed
- # queue while we're removing it.
- redis.watch(key) do
- if redis.llen(key).to_i == 0
- # If the list is empty, remove it.
- redis.multi do |transaction|
- transaction.del(key)
- transaction.zrem(:delayed_queue_schedule, timestamp.to_i)
- end
- else
- redis.redis.unwatch
- end
- end
end
def search_first_delayed_timestamp_in_range(start_at, stop_at)
start_at = start_at.nil? ? '-inf' : start_at.to_i
stop_at = stop_at.nil? ? '+inf' : stop_at.to_i