Sha256: 820e6f061b428e1bd97fe6ff810e4bfa5d088b8abe99a0b97ccb03042b914e43

Contents?: true

Size: 1.56 KB

Versions: 2

Compression:

Stored size: 1.56 KB

Contents

# frozen_string_literal: true

module MiniScheduler
  class DistributedMutex
    class Timeout < StandardError
    end

    @default_redis = nil

    def self.redis=(redis)
      @default_redis = redis
    end

    def self.synchronize(key, redis = nil, &blk)
      self.new(key, redis || @default_redis).synchronize(&blk)
    end

    def initialize(key, redis)
      raise ArgumentError.new("redis argument is nil") if redis.nil?
      @key = key
      @redis = redis
      @mutex = Mutex.new
    end

    MAX_POLLING_ATTEMPTS ||= 60
    BASE_SLEEP_DURATION ||= 0.001
    MAX_SLEEP_DURATION ||= 1

    # NOTE wrapped in mutex to maintain its semantics
    def synchronize
      @mutex.lock

      attempts = 0
      sleep_duration = BASE_SLEEP_DURATION
      while !try_to_get_lock
        sleep(sleep_duration)

        if sleep_duration < MAX_SLEEP_DURATION
          sleep_duration = [sleep_duration * 2, MAX_SLEEP_DURATION].min
        end

        attempts += 1
        raise Timeout if attempts >= MAX_POLLING_ATTEMPTS
      end

      yield
    ensure
      @redis.del @key
      @mutex.unlock
    end

    private

    def try_to_get_lock
      got_lock = false
      if @redis.setnx @key, Time.now.to_i + 60
        @redis.expire @key, 60
        got_lock = true
      else
        begin
          @redis.watch @key
          time = @redis.get @key
          if time && time.to_i < Time.now.to_i
            got_lock = @redis.multi { @redis.set @key, Time.now.to_i + 60 }
          end
        ensure
          @redis.unwatch
        end
      end

      got_lock
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
mini_scheduler-0.18.0 lib/mini_scheduler/distributed_mutex.rb
mini_scheduler-0.17.0 lib/mini_scheduler/distributed_mutex.rb