Sha256: 1a122b69ba8e36f9458412fab80e148f7ebf4cc1a4b6bf5509740e8d4c12ae7f
Contents?: true
Size: 1.7 KB
Versions: 3
Compression:
Stored size: 1.7 KB
Contents
module QPush module Server # The Delay worker requeues any jobs that have been delayed on our Redis # server. Delayed jobs are pulled by a 'zrangebyscore', with the score # representing the time the job should be performed. # class Delay def initialize @done = false @conn = nil end # Starts our delay process. This will run until instructed to stop. # def start until @done QPush.redis.with do |conn| @conn = conn watch_delay { retrieve_delays } end sleep 2 end end # Shutsdown our dleay process. # def shutdown @done = true end private # Retrieves delayed jobs based on the time they should be performed. # If any are found, begin to update them. # def retrieve_delays delays = @conn.zrangebyscore(QPush.config.delay_namespace, 0, Time.now.to_i) delays.any? ? update_delays(delays) : @conn.unwatch end # Removes jobs that have been retrieved and sets them up to be performed. # def update_delays(delays) @conn.multi do |multi| multi.zrem(QPush.config.delay_namespace, delays) delays.each { |job| perform_job(job) } end end # Add a delayed job to the appropriate perform list. # def perform_job(json) job = Job.new(JSON.parse(json)) job.api.perform rescue => e raise ServerError, e.message end # Performs a watch on our delay list # def watch_delay @conn.watch(QPush.config.delay_namespace) do yield if block_given? end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
qpush-0.1.2 | lib/qpush/server/delay.rb |
qpush-0.1.1 | lib/qpush/server/delay.rb |
qpush-0.1.0 | lib/qpush/server/delay.rb |