Sha256: 57ba96550f7d4c80832720ff10aa70944ef9cdfb90071f771ff62f6a0a0846a8
Contents?: true
Size: 1.89 KB
Versions: 2
Compression:
Stored size: 1.89 KB
Contents
module Afterparty class RedisQueue attr_accessor :redis, :options, :temp_namespace, :consumer include Afterparty::QueueHelpers def initialize options={}, consumer_options={} # @consumer = ThreadedQueueConsumer.new(self, consumer_options).start @options = options @options[:namespace] ||= "default" Afterparty.add_queue @options[:namespace] @options[:sleep] ||= 5 @mutex = Mutex.new end def push job @mutex.synchronize do return nil if job.nil? job.class.module_eval do attr_accessor :afterparty_job_id, :afterparty_queue end queue_name = @temp_namespace || @options[:namespace] job.afterparty_queue = queue_name job.afterparty_job_id = Afterparty.next_job_id queue_name async_redis_call{ redis_call :zadd, queue_time(job), Marshal.dump(job) } @temp_namespace = nil end end alias :<< :push alias :eng :push def pop @mutex.synchronize do while true do if !(_jobs = valid_jobs).empty? job_dump = _jobs[0] async_redis_call do redis_call :zrem, job_dump @temp_namespace = "completed" redis_call :zadd, Time.now.to_i, job_dump end begin return Marshal.load(job_dump) rescue ArgumentException => e puts "You encountered an argument exception while deserializing a job." puts "Message: #{e.message}" raise e end end sleep(@options[:sleep]) end end end end class TestRedisQueue < RedisQueue attr_accessor :completed_jobs def initialize opts={}, consumer_opts={} super @completed_jobs = [] @exceptions = [] end def handle_exception job, exception @exceptions << [job, exception] end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
afterparty-0.0.4 | lib/afterparty/redis_queue.rb |
afterparty-0.0.3 | lib/afterparty/redis_queue.rb |