lib/mongo-lock.rb in mongo-lock-1.0.0 vs lib/mongo-lock.rb in mongo-lock-1.1.0

- old
+ new

@@ -1,26 +1,31 @@ require 'mongo-lock/configuration' +require 'mongo-lock/queries' +require 'mongo-lock/class_convenience_methods' module Mongo class Lock + extend Mongo::Lock::ClassConvenienceMethods + class NotAcquiredError < StandardError ; end class NotReleasedError < StandardError ; end class NotExtendedError < StandardError ; end attr_accessor :configuration attr_accessor :key attr_accessor :acquired attr_accessor :expires_at attr_accessor :released + attr_accessor :query def self.configure options = {}, &block defaults = { - timeout_in: 10, + timeout_in: false, limit: 100, frequency: 1, - expires_after: 10, + expire_in: 10, raise: false, owner: Proc.new { "#{`hostname`.strip}:#{Process.pid}:#{Thread.object_id}" } } defaults = defaults.merge(@@default_configuration) if defined?(@@default_configuration) && @@default_configuration @@default_configuration = Configuration.new(defaults, options, &block) @@ -32,128 +37,91 @@ else @@default_configuration = configure end end + def self.release_all options = {} if options.include? :collection - release_collection configuration.collection(options[:collection]), options[:owner] + Mongo::Lock::Queries.release_collection configuration.collection(options[:collection]), options[:owner] else configuration.collections.each_pair do |key,collection| - release_collection collection, options[:owner] + Mongo::Lock::Queries.release_collection collection, options[:owner] end end end - def self.release_collection collection, owner=nil - selector = if owner then { owner: owner } else {} end - collection.remove(selector) - end - - def self.init_and_send key, options = {}, method - lock = self.new(key, options) - lock.send(method) - lock - end - - def self.acquire key, options = {} - init_and_send key, options, :acquire - end - - def self.release key, options = {} - init_and_send key, options, :release - end - - def self.acquire! key, options = {} - init_and_send key, options, :acquire! - end - - def self.release! key, options = {} - init_and_send key, options, :release! - end - - def self.available? key, options = {} - init_and_send key, options, :available? - end - def self.ensure_indexes configuration.collections.each_pair do |key, collection| - collection.create_index([ - ['key', Mongo::ASCENDING], - ['owner', Mongo::ASCENDING], - ['expires_at', Mongo::ASCENDING] - ]) - collection.create_index([['ttl', Mongo::ASCENDING]],{ expireAfterSeconds: 0 }) + Mongo::Lock::Queries.ensure_indexes collection end end def self.clear_expired configuration.collections.each_pair do |key,collection| - collection.remove expires_at: { '$lt' => Time.now } + Mongo::Lock::Queries.clear_expired collection end end + def initialize key, options = {} self.configuration = Configuration.new self.class.configuration.to_hash, options self.key = key + self.query = Mongo::Lock::Queries.new self acquire_if_acquired end + # API + def configure options = {}, &block self.configuration = Configuration.new self.configuration.to_hash, options yield self.configuration if block_given? end def acquire options = {} - options = configuration.to_hash.merge options + options = inherit_options options i = 1 time_spent = 0 loop do - # If timeout has expired - if options[:timeout_in] && options[:timeout_in] < time_spent - return raise_or_false options + result = try_acquire options, i, time_spent + return result unless result.nil? - # If limit has expired - elsif options[:limit] && options[:limit] < i - return raise_or_false options + frequency = call_if_proc options[:frequency], i + sleep frequency + time_spent += frequency + i += 1 + end + end - # If there is an existing lock - elsif existing_lock = find_or_insert(options) + def try_acquire options, i, time_spent + # If timeout has expired + if options[:timeout_in] && options[:timeout_in] < time_spent + return raise_or_false options - # If the lock is owned by me - if existing_lock['owner'] == options[:owner] - self.acquired = true - extend_by options[:expires_after] - return true - end + # If limit has expired + elsif options[:limit] && options[:limit] < i + return raise_or_false options - # If the lock was acquired - else + # If there is an existing lock + elsif existing_lock = query.find_or_insert(options) + # If the lock is owned by me + if existing_lock['owner'] == options[:owner] self.acquired = true + extend_by options[:expire_in] return true - end - if options[:frequency].is_a? Proc - frequency = options[:frequency].call(i) - else - frequency = options[:frequency] - end - sleep frequency - time_spent += frequency - i += 1 + # If the lock was acquired + else + self.acquired = true + return true end end - def acquire! options = {} - options[:raise] = true - acquire options - end - def release options = {} - options = configuration.to_hash.merge options + options = inherit_options options # If the lock has already been released if released? return true @@ -172,106 +140,63 @@ end else self.released = true self.acquired = false - collection.remove key: key, owner: options[:owner] + query.remove options return true end end - def release! options = {} - options[:raise] = true - release options - end - - def raise_or_false options, error = NotAcquiredError - raise error if options[:raise] - false - end - - def find_or_insert options - to_expire_at = Time.now + options[:expires_after] - existing_lock = collection.find_and_modify({ - query: query, - update: { - '$setOnInsert' => { - key: key, - owner: options[:owner], - expires_at: to_expire_at, - ttl: to_expire_at - } - }, - upsert: true - }) - - if existing_lock - self.expires_at = existing_lock['expires_at'] - else - self.expires_at = to_expire_at - end - - existing_lock - end - def extend_by time, options = {} - options = configuration.to_hash.merge options + options = inherit_options options # Can't extend a lock that hasn't been acquired if !acquired? return raise_or_false options, NotExtendedError # Can't extend a lock that has started elsif expired? return raise_or_false options, NotExtendedError else - to_expire_at = expires_at + time - existing_lock = collection.find_and_modify({ - query: query, - update: { - '$set' => { - key: key, - owner: options[:owner], - expires_at: to_expire_at, - ttl: to_expire_at - } - }, - upsert: true - }) + query.find_and_update time, options true end end def extend options = {} - time = configuration.to_hash.merge(options)[:expires_after] + time = configuration.to_hash.merge(options)[:expire_in] extend_by time, options end - def extend_by! time, options = {} - options[:raise] = true - extend_by time, options + def available? options = {} + options = inherit_options options + existing_lock = query.find_existing + !existing_lock || existing_lock['owner'] == options[:owner] end - def extend! options = {} - options[:raise] = true - extend options + # Raise methods + + def acquire! options = {} + send_with_raise :acquire, options end - def available? options = {} - options = configuration.to_hash.merge options - existing_lock = collection.find(query).first - !existing_lock || existing_lock['owner'] == options[:owner] + def release! options = {} + send_with_raise :release, options end - def query - { - key: key, - expires_at: { '$gt' => Time.now } - } + def extend_by! time, options = {} + send_with_raise :extend_by, time, options end + def extend! options = {} + send_with_raise :extend, options + end + + # Current state + def acquired? !!acquired && !expired? end def expired? @@ -280,16 +205,34 @@ def released? !!released end + # Utils + def acquire_if_acquired - if (collection.find({ - key: key, - owner: configuration.owner, - expires_at: { '$gt' => Time.now } - }).count > 0) - self.acquired = true + self.acquired = true if query.is_acquired? + end + + def send_with_raise method, *args + args.last[:raise] = true + self.send(method, *args) + end + + def raise_or_false options, error = NotAcquiredError + raise error if options[:raise] + false + end + + def inherit_options options + configuration.to_hash.merge options + end + + def call_if_proc proc, *args + if proc.is_a? Proc + proc.call(*args) + else + proc end end end end