Sha256: 9cdc64f2635593781442a0a2b43e3733d33e87ddee18ed1eff1080fdd8c061e4
Contents?: true
Size: 1.96 KB
Versions: 1
Compression:
Stored size: 1.96 KB
Contents
module Resque module Plugins module RateLimitedQueue RESQUE_PREFIX = 'queue:' MUTEX = 'Resque::Plugins::RateLimitedQueue' def around_perform_with_check_and_requeue(*params) paused = false with_lock do paused = paused? Resque.enqueue_to(paused_queue_name, self, *params) if paused end return if paused yield end def rate_limited_enqueue(klass, *params) with_lock do if paused? Resque.enqueue_to(paused_queue_name, klass, *params) else Resque.enqueue_to(@queue, klass, *params) end end end def rate_limited_requeue(klass, *params) # split from above to make it easy to stub for testing rate_limited_enqueue(klass, *params) end def pause_for(timestamp) UnPause.enqueue(timestamp, name) if pause end def un_pause Resque.redis.renamenx(RESQUE_PREFIX + paused_queue_name, RESQUE_PREFIX + @queue.to_s) true rescue Redis::CommandError => e raise unless e.message == 'ERR no such key' false end def pause Resque.redis.renamenx(RESQUE_PREFIX + @queue.to_s, RESQUE_PREFIX + paused_queue_name) true rescue Redis::CommandError => e raise unless e.message == 'ERR no such key' false end def paused? Resque.redis.exists(RESQUE_PREFIX + paused_queue_name) end def paused_queue_name @queue.to_s + '_paused' end def with_lock if Resque.inline yield else RedisMutex.with_lock(MUTEX, block: 60, expire: 120) { yield } end end def find_class(klass) return klass if klass.is_a? Class return Object.const_get(klass) unless klass.include?('::') klass.split('::').reduce(Object) do |mod, class_name| mod.const_get(class_name) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
resque-rate_limited_queue-0.0.34 | lib/resque/plugins/rate_limited_queue/rate_limited_queue.rb |