lib/mongo-lock.rb in mongo-lock-1.0.0 vs lib/mongo-lock.rb in mongo-lock-1.1.0
- old
+ new
@@ -1,26 +1,31 @@
require 'mongo-lock/configuration'
+require 'mongo-lock/queries'
+require 'mongo-lock/class_convenience_methods'
module Mongo
class Lock
+ extend Mongo::Lock::ClassConvenienceMethods
+
class NotAcquiredError < StandardError ; end
class NotReleasedError < StandardError ; end
class NotExtendedError < StandardError ; end
attr_accessor :configuration
attr_accessor :key
attr_accessor :acquired
attr_accessor :expires_at
attr_accessor :released
+ attr_accessor :query
def self.configure options = {}, &block
defaults = {
- timeout_in: 10,
+ timeout_in: false,
limit: 100,
frequency: 1,
- expires_after: 10,
+ expire_in: 10,
raise: false,
owner: Proc.new { "#{`hostname`.strip}:#{Process.pid}:#{Thread.object_id}" }
}
defaults = defaults.merge(@@default_configuration) if defined?(@@default_configuration) && @@default_configuration
@@default_configuration = Configuration.new(defaults, options, &block)
@@ -32,128 +37,91 @@
else
@@default_configuration = configure
end
end
+
def self.release_all options = {}
if options.include? :collection
- release_collection configuration.collection(options[:collection]), options[:owner]
+ Mongo::Lock::Queries.release_collection configuration.collection(options[:collection]), options[:owner]
else
configuration.collections.each_pair do |key,collection|
- release_collection collection, options[:owner]
+ Mongo::Lock::Queries.release_collection collection, options[:owner]
end
end
end
- def self.release_collection collection, owner=nil
- selector = if owner then { owner: owner } else {} end
- collection.remove(selector)
- end
-
- def self.init_and_send key, options = {}, method
- lock = self.new(key, options)
- lock.send(method)
- lock
- end
-
- def self.acquire key, options = {}
- init_and_send key, options, :acquire
- end
-
- def self.release key, options = {}
- init_and_send key, options, :release
- end
-
- def self.acquire! key, options = {}
- init_and_send key, options, :acquire!
- end
-
- def self.release! key, options = {}
- init_and_send key, options, :release!
- end
-
- def self.available? key, options = {}
- init_and_send key, options, :available?
- end
-
def self.ensure_indexes
configuration.collections.each_pair do |key, collection|
- collection.create_index([
- ['key', Mongo::ASCENDING],
- ['owner', Mongo::ASCENDING],
- ['expires_at', Mongo::ASCENDING]
- ])
- collection.create_index([['ttl', Mongo::ASCENDING]],{ expireAfterSeconds: 0 })
+ Mongo::Lock::Queries.ensure_indexes collection
end
end
def self.clear_expired
configuration.collections.each_pair do |key,collection|
- collection.remove expires_at: { '$lt' => Time.now }
+ Mongo::Lock::Queries.clear_expired collection
end
end
+
def initialize key, options = {}
self.configuration = Configuration.new self.class.configuration.to_hash, options
self.key = key
+ self.query = Mongo::Lock::Queries.new self
acquire_if_acquired
end
+ # API
+
def configure options = {}, &block
self.configuration = Configuration.new self.configuration.to_hash, options
yield self.configuration if block_given?
end
def acquire options = {}
- options = configuration.to_hash.merge options
+ options = inherit_options options
i = 1
time_spent = 0
loop do
- # If timeout has expired
- if options[:timeout_in] && options[:timeout_in] < time_spent
- return raise_or_false options
+ result = try_acquire options, i, time_spent
+ return result unless result.nil?
- # If limit has expired
- elsif options[:limit] && options[:limit] < i
- return raise_or_false options
+ frequency = call_if_proc options[:frequency], i
+ sleep frequency
+ time_spent += frequency
+ i += 1
+ end
+ end
- # If there is an existing lock
- elsif existing_lock = find_or_insert(options)
+ def try_acquire options, i, time_spent
+ # If timeout has expired
+ if options[:timeout_in] && options[:timeout_in] < time_spent
+ return raise_or_false options
- # If the lock is owned by me
- if existing_lock['owner'] == options[:owner]
- self.acquired = true
- extend_by options[:expires_after]
- return true
- end
+ # If limit has expired
+ elsif options[:limit] && options[:limit] < i
+ return raise_or_false options
- # If the lock was acquired
- else
+ # If there is an existing lock
+ elsif existing_lock = query.find_or_insert(options)
+ # If the lock is owned by me
+ if existing_lock['owner'] == options[:owner]
self.acquired = true
+ extend_by options[:expire_in]
return true
-
end
- if options[:frequency].is_a? Proc
- frequency = options[:frequency].call(i)
- else
- frequency = options[:frequency]
- end
- sleep frequency
- time_spent += frequency
- i += 1
+ # If the lock was acquired
+ else
+ self.acquired = true
+ return true
end
end
- def acquire! options = {}
- options[:raise] = true
- acquire options
- end
-
def release options = {}
- options = configuration.to_hash.merge options
+ options = inherit_options options
# If the lock has already been released
if released?
return true
@@ -172,106 +140,63 @@
end
else
self.released = true
self.acquired = false
- collection.remove key: key, owner: options[:owner]
+ query.remove options
return true
end
end
- def release! options = {}
- options[:raise] = true
- release options
- end
-
- def raise_or_false options, error = NotAcquiredError
- raise error if options[:raise]
- false
- end
-
- def find_or_insert options
- to_expire_at = Time.now + options[:expires_after]
- existing_lock = collection.find_and_modify({
- query: query,
- update: {
- '$setOnInsert' => {
- key: key,
- owner: options[:owner],
- expires_at: to_expire_at,
- ttl: to_expire_at
- }
- },
- upsert: true
- })
-
- if existing_lock
- self.expires_at = existing_lock['expires_at']
- else
- self.expires_at = to_expire_at
- end
-
- existing_lock
- end
-
def extend_by time, options = {}
- options = configuration.to_hash.merge options
+ options = inherit_options options
# Can't extend a lock that hasn't been acquired
if !acquired?
return raise_or_false options, NotExtendedError
# Can't extend a lock that has started
elsif expired?
return raise_or_false options, NotExtendedError
else
- to_expire_at = expires_at + time
- existing_lock = collection.find_and_modify({
- query: query,
- update: {
- '$set' => {
- key: key,
- owner: options[:owner],
- expires_at: to_expire_at,
- ttl: to_expire_at
- }
- },
- upsert: true
- })
+ query.find_and_update time, options
true
end
end
def extend options = {}
- time = configuration.to_hash.merge(options)[:expires_after]
+ time = configuration.to_hash.merge(options)[:expire_in]
extend_by time, options
end
- def extend_by! time, options = {}
- options[:raise] = true
- extend_by time, options
+ def available? options = {}
+ options = inherit_options options
+ existing_lock = query.find_existing
+ !existing_lock || existing_lock['owner'] == options[:owner]
end
- def extend! options = {}
- options[:raise] = true
- extend options
+ # Raise methods
+
+ def acquire! options = {}
+ send_with_raise :acquire, options
end
- def available? options = {}
- options = configuration.to_hash.merge options
- existing_lock = collection.find(query).first
- !existing_lock || existing_lock['owner'] == options[:owner]
+ def release! options = {}
+ send_with_raise :release, options
end
- def query
- {
- key: key,
- expires_at: { '$gt' => Time.now }
- }
+ def extend_by! time, options = {}
+ send_with_raise :extend_by, time, options
end
+ def extend! options = {}
+ send_with_raise :extend, options
+ end
+
+ # Current state
+
def acquired?
!!acquired && !expired?
end
def expired?
@@ -280,16 +205,34 @@
def released?
!!released
end
+ # Utils
+
def acquire_if_acquired
- if (collection.find({
- key: key,
- owner: configuration.owner,
- expires_at: { '$gt' => Time.now }
- }).count > 0)
- self.acquired = true
+ self.acquired = true if query.is_acquired?
+ end
+
+ def send_with_raise method, *args
+ args.last[:raise] = true
+ self.send(method, *args)
+ end
+
+ def raise_or_false options, error = NotAcquiredError
+ raise error if options[:raise]
+ false
+ end
+
+ def inherit_options options
+ configuration.to_hash.merge options
+ end
+
+ def call_if_proc proc, *args
+ if proc.is_a? Proc
+ proc.call(*args)
+ else
+ proc
end
end
end
end