Sha256: 8b2afe425438c0cde8df6eb893f28e398fa6d1c492959af84f49be7afe3903b5

Contents?: true

Size: 1.86 KB

Versions: 4

Compression:

Stored size: 1.86 KB

Contents

module Resque
  module DelayedJob
    def before_enqueue_with_delay(*args)
      unless Resque.delayed_queue?(self)
        raise DelayedQueueError.new 'trying to insert a delayed job into a non-delayed queue' 
      end
      unless args[0].is_a?(Hash) && args[0].has_key?(:delay_until)
        raise DelayedQueueError.new 'trying to insert delayed job without delay_until'
      end
    end

    def before_create_with_delay(item, *args)
      item[:delay_until] = args[0][:delay_until]
    end

    def before_pop_with_delay(query)
      query['delay_until'] = {'$lt' => Time.now } if delayed_queue?(queue)
    end
  end
  
  module Delayed
    def self.extended(base)
      base.class_eval { @delayed_queues = [] }
    end

    def initialize_delayed
      delayed_queues = mongo_stats.find_one(:stat => 'Delayed Queues')
      @delayed_queues = delayed_queues['value'] if delayed_queues
    end

    def delayed_job?(klass)
      klass.instance_variable_get(:@delayed) ||
        (klass.respond_to?(:delayed) and klass.delayed)
    end

    def delayed_queue?(queue)
      @delayed_queues.include? namespace_queue(queue)
    end

    def enable_delay(queue)
      queue = namespace_queue(queue)
      unless delayed_queue? queue
        @delayed_queues << queue
        mongo_stats.update({:stat => 'Delayed Queues'}, {'$addToSet' => {'value' => queue}}, {:upsert => true})
      end
    end

    def delayed_size(queue)
      queue = namespace_queue(queue)
      if delayed_queue? queue
        mongo[queue].find({'delay_until' => { '$gt' => Time.now }}).count
      else
        mongo[queue].count
      end
    end

    def ready_size(queue)
      queue = namespace_queue(queue)
      if delayed_queue? queue
        mongo[queue].find({'delay_until' => { '$lt' => Time.now }}).count
      else
        mongo[queue].count
      end
    end
  end
  
  class DelayedQueueError < RuntimeError; end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
mongo-resque-1.20.0 lib/resque/delayed.rb
mongo-resque-1.19.0.1 lib/resque/delayed.rb
mongo-resque-1.19.0 lib/resque/delayed.rb
mongo-resque-1.18.2 lib/resque/delayed.rb