Sha256: 491f3471133af593200423322091ee287bcfdc7e3bbf29fa37eb808b14347a4b

Contents?: true

Size: 1.53 KB

Versions: 2

Compression:

Stored size: 1.53 KB

Contents

require 'resque-unique_at_runtime/version'

module Resque
  module Plugins
    module UniqueAtRuntime
      LOCK_TIMEOUT = 60 * 60 * 24 * 5 # 5 days

      def lock_timeout
        Time.now.to_i + LOCK_TIMEOUT + 1
      end

      def requeue_interval
        self.instance_variable_get(:@requeue_interval) || 1
      end

      # Overwrite this method to uniquely identify which mutex should be used
      # for a resque worker.
      def unique_at_runtime_redis_key(*_)
        "unique_at_runtime:#{@queue}"
      end

      def can_lock_queue?(*args)
        now = Time.now.to_i
        key = unique_at_runtime_redis_key(*args)
        timeout = lock_timeout

        # Per http://redis.io/commands/setnx
        return true  if Resque.redis.setnx(key, timeout)
        return false if Resque.redis.get(key).to_i > now
        return true  if Resque.redis.getset(key, timeout).to_i <= now
        return false
      end

      def unlock_queue(*args)
        Resque.redis.del(unique_at_runtime_redis_key(*args))
      end

      def reenqueue(*args)
        Resque.enqueue(self, *args)
      end

      def before_perform(*args)
        unless can_lock_queue?(*args)
          # Sleep so the CPU's rest
          sleep(requeue_interval)

          # can't get the lock, so re-enqueue the task
          reenqueue(*args)

          # and don't perform
          raise Resque::Job::DontPerform
        end
      end

      def around_perform(*args)
        begin
          yield
        ensure
          unlock_queue(*args)
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
resque-unique_at_runtime-2.0.1 lib/resque-unique_at_runtime.rb
resque-unique_at_runtime-2.0.0 lib/resque-unique_at_runtime.rb