Sha256: 736dd10b8788e826dd23fe1f5a0e665c54316cc5c9f6d6e069762ec2cd91b5fe
Contents?: true
Size: 1.95 KB
Versions: 2
Compression:
Stored size: 1.95 KB
Contents
module Afterparty module QueueHelpers def [] namespace @temp_namespace = namespace end def redis_queue_name puts (a = Afterparty.redis_queue_name(@temp_namespace || @options[:namespace])) a end def clear redis_call :del end def redis_call command, *args result = Afterparty.redis_call (@temp_namespace || @options[:namespace]), command, *args @temp_namespace = nil result end def async_redis_call &block Afterparty.redis.pipelined &block end def jobs _jobs = redis_call(:zrange, 0, -1) _jobs.each_with_index do |job, i| _jobs[i] = Marshal.load(job) end _jobs end def jobs_with_scores hash_from_scores(redis_call(:zrange, 0, -1, {withscores: true})) end def valid_jobs redis_call :zrangebyscore, 0, Time.now.to_i end def next_valid_job valid_jobs.first end def jobs_empty? count = total_jobs_count # ap count count == 0 end def total_jobs_count redis_call(:zcount, "-inf", "+inf") end def redis @@redis end def last_completed @temp_namespace = "completed" redis_call(:zrange, -1, -1).first end def completed @temp_namespace = "completed" redis_call(:zrange, -20, -1).reverse end def completed_with_scores @temp_namespace = "completed" hash_from_scores(redis_call(:zrange, -20, -1, withscores: true)).reverse end private def hash_from_scores raw arr = [] raw.each do |group| arr << Afterparty::JobContainer.new(group[0], group[1]) end arr end # returns true if job has an :execute_at value def job_valid? job job.respond_to?(:execute_at) && !job.execute_at.nil? end # return timestamp of :execute_at or current time def queue_time job time = job_valid?(job) ? job.execute_at.to_i : Time.now.to_i end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
afterparty-0.0.4 | lib/afterparty/queue_helpers.rb |
afterparty-0.0.3 | lib/afterparty/queue_helpers.rb |