Sha256: e1567995e07c80f3a604c4c48f8441878f9051e5ecd1d069c8ea906a5ed6d284
Contents?: true
Size: 1.92 KB
Versions: 1
Compression:
Stored size: 1.92 KB
Contents
require 'logging' require 'dalli' require 'retryable' module MegaMutex class TimeoutError < Exception; end class DistributedMutex class << self def cache @cache ||= Dalli::Client.new MegaMutex.configuration.memcache_servers, :namespace => MegaMutex.configuration.namespace end end def initialize(key, timeout = nil) @key = key @timeout = timeout end def logger Logging::Logger[self] end def run(&block) @start_time = Time.now log "Attempting to lock mutex..." lock! log "Locked. Running critical section..." result = yield log "Critical section complete. Unlocking..." result ensure unlock! log "Unlocking Mutex." end def current_lock cache.get(@key) end private def timeout? return false unless @timeout Time.now > @start_time + @timeout end def log(message) logger.debug do "(key:#{@key}) (lock_id:#{my_lock_id}) #{message}" end end def lock! until timeout? retryable(:tries => 5, :sleep => 30, :on => Dalli::NetworkError) do return if attempt_to_lock == my_lock_id sleep 0.1 end end raise TimeoutError.new("Failed to obtain a lock within #{@timeout} seconds.") end def attempt_to_lock if current_lock.nil? set_current_lock my_lock_id end current_lock end def unlock! retryable(:tries => 5, :sleep => 30, :on => Dalli::NetworkError) do cache.delete(@key) if locked_by_me? end end def locked_by_me? current_lock == my_lock_id end def set_current_lock(new_lock) cache.add(@key, my_lock_id) end def my_lock_id @my_lock_id ||= "#{Process.pid.to_s}.#{self.object_id.to_s}.#{Time.now.to_i.to_s}" end def cache self.class.cache end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
mega_mutex-dalli-0.3.0 | lib/mega_mutex/distributed_mutex.rb |