lib/ohm.rb in ohm-0.1.0.rc5 vs lib/ohm.rb in ohm-0.1.0.rc6
- old
+ new
@@ -1,27 +1,28 @@
# encoding: UTF-8
require "base64"
require "redis"
+require "nest"
require File.join(File.dirname(__FILE__), "ohm", "pattern")
require File.join(File.dirname(__FILE__), "ohm", "validations")
require File.join(File.dirname(__FILE__), "ohm", "compat-1.8.6")
require File.join(File.dirname(__FILE__), "ohm", "key")
module Ohm
# Provides access to the Redis database. This is shared accross all models and instances.
- def redis
+ def self.redis
threaded[:redis] ||= connection(*options)
end
- def redis=(connection)
+ def self.redis=(connection)
threaded[:redis] = connection
end
- def threaded
+ def self.threaded
Thread.current[:ohm] ||= {}
end
# Connect to a redis database.
#
@@ -30,36 +31,34 @@
# @option options [#to_s] :port (6379) Port number.
# @option options [#to_s] :db (0) Database number.
# @option options [#to_s] :timeout (0) Database timeout in seconds.
# @example Connect to a database in port 6380.
# Ohm.connect(:port => 6380)
- def connect(*options)
+ def self.connect(*options)
self.redis = nil
@options = options
end
# Return a connection to Redis.
#
- # This is a wapper around Redis.new(options)
- def connection(*options)
- Redis.new(*options)
+ # This is a wapper around Redis.connect(options)
+ def self.connection(*options)
+ Redis.connect(*options)
end
- def options
+ def self.options
@options = [] unless defined? @options
@options
end
# Clear the database.
- def flush
+ def self.flush
redis.flushdb
end
- module_function :connect, :connection, :flush, :redis, :redis=, :options, :threaded
+ class Error < StandardError; end
- Error = Class.new(StandardError)
-
class Model
# Wraps a model name for lazy evaluation.
class Wrapper < BasicObject
def initialize(name, &block)
@@ -104,24 +103,18 @@
def add(model)
self << model
end
- def first(options = {})
- if options[:by]
- sort_by(options.delete(:by), options.merge(:limit => 1)).first
- else
- model[key.first(options)]
- end
- end
+ def sort(_options = {})
+ return [] unless key.exists
- def [](index)
- model[key[index]]
- end
+ options = _options.dup
+ options[:start] ||= 0
+ options[:limit] = [options[:start], options[:limit]] if options[:limit]
- def sort(*args)
- key.sort(*args).map(&model)
+ key.sort(options).map(&model)
end
# Sort the model instances by the given attribute.
#
# @example Sorting elements by name:
@@ -130,32 +123,32 @@
# User.create :name => "A"
#
# user = User.all.sort_by(:name, :order => "ALPHA").first
# user.name == "A"
# # => true
- def sort_by(att, options = {})
- options.merge!(:by => model.key("*->#{att}"))
+ def sort_by(att, _options = {})
+ return [] unless key.exists
+ options = _options.dup
+ options.merge!(:by => model.key["*->#{att}"])
+
if options[:get]
- key.sort(options.merge(:get => model.key("*->#{options[:get]}")))
+ key.sort(options.merge(:get => model.key["*->#{options[:get]}"]))
else
sort(options)
end
end
def clear
key.del
end
- def concat(models)
- models.each { |model| add(model) }
- self
- end
-
def replace(models)
- clear
- concat(models)
+ model.db.multi do
+ clear
+ models.each { |model| add(model) }
+ end
end
def empty?
!key.exists
end
@@ -202,42 +195,12 @@
source = keys(options)
target = source.inject(key.volatile) { |chain, other| chain - other }
apply(:sdiffstore, key, source, target)
end
- def sort(options = {})
- return [] unless key.exists
-
- options[:start] ||= 0
- options[:limit] = [options[:start], options[:limit]] if options[:limit]
-
- key.sort(options).map(&model)
- end
-
- # Sort the model instances by the given attribute.
- #
- # @example Sorting elements by name:
- #
- # User.create :name => "B"
- # User.create :name => "A"
- #
- # user = User.all.sort_by(:name, :order => "ALPHA").first
- # user.name == "A"
- # # => true
- def sort_by(att, options = {})
- return [] unless key.exists
-
- options.merge!(:by => model.key["*->#{att}"])
-
- if options[:get]
- key.sort(options.merge(:get => model.key["*->#{options[:get]}"]))
- else
- sort(options)
- end
- end
-
- def first(options = {})
+ def first(_options = {})
+ options = _options.dup
options.merge!(:limit => 1)
if options[:by]
sort_by(options.delete(:by), options).first
else
@@ -312,17 +275,15 @@
def first
self[0]
end
def pop
- id = key.rpop
- model[id] if id
+ model[key.rpop]
end
def shift
- id = key.lpop
- model[id] if id
+ model[key.lpop]
end
def unshift(model)
key.lpush(model.id)
end
@@ -568,11 +529,11 @@
@_memo[name] ||= instance_eval(&block)
end
end
def self.[](id)
- new(:id => id) if exists?(id)
+ new(:id => id) if id && exists?(id)
end
def self.to_proc
Proc.new { |id| self[id] }
end
@@ -673,11 +634,11 @@
# Increment the counter denoted by :att.
#
# @param att [Symbol] Attribute to increment.
def incr(att, count = 1)
raise ArgumentError, "#{att.inspect} is not a counter." unless counters.include?(att)
- write_local(att, db.hincrby(key, att, count))
+ write_local(att, key.hincrby(att, count))
end
# Decrement the counter denoted by :att.
#
# @param att [Symbol] Attribute to decrement.
@@ -805,28 +766,28 @@
def write
unless (attributes + counters).empty?
atts = (attributes + counters).inject([]) { |ret, att|
value = send(att).to_s
- ret.push(att, value) if not value.empty?
+ ret.push(att, value) if not value.empty?
ret
}
db.multi do
- db.del(key)
- db.hmset(key, *atts.flatten) if atts.any?
+ key.del
+ key.hmset(*atts.flatten) if atts.any?
end
end
end
def write_remote(att, value)
write_local(att, value)
if value.to_s.empty?
- db.hdel(key, att)
+ key.hdel(att)
else
- db.hset(key, att, value)
+ key.hset(att, value)
end
end
def self.const_missing(name)
wrapper = Wrapper.new(name) { const_get(name) }
@@ -854,15 +815,15 @@
def self.key
Key.new(self, db)
end
def self.exists?(id)
- db.sismember(key[:all], id)
+ key[:all].sismember(id)
end
def initialize_id
- self.id = db.incr(self.class.key[:id]).to_s
+ @id ||= self.class.key[:id].incr.to_s
end
def db
self.class.db
end
@@ -874,11 +835,11 @@
def create_model_membership
self.class.all << self
end
def delete_model_membership
- db.del(key)
+ key.del
self.class.all.delete(self)
end
def update_indices
delete_from_indices
@@ -901,20 +862,20 @@
value.kind_of?(String) == false
end
def add_to_index(att, value = send(att))
index = index_key_for(att, value)
- db.sadd(index, id)
- db.sadd(key[:_indices], index)
+ index.sadd(id)
+ key[:_indices].sadd(index)
end
def delete_from_indices
- db.smembers(key[:_indices]).each do |index|
+ key[:_indices].smembers.each do |index|
db.srem(index, id)
end
- db.del(key[:_indices])
+ key[:_indices].del
end
def read_local(att)
@_attributes[att]
end
@@ -923,11 +884,11 @@
@_attributes[att] = value
end
def read_remote(att)
unless new?
- value = db.hget(key, att)
+ value = key.hget(att)
value.respond_to?(:force_encoding) ?
value.force_encoding("UTF-8") :
value
end
end
@@ -957,29 +918,25 @@
# This method implements the design pattern for locks
# described at: http://code.google.com/p/redis/wiki/SetnxCommand
#
# @see Model#mutex
def lock!
- until db.setnx(key[:_lock], lock_timeout)
- next unless lock = db.get(key[:_lock])
- sleep(0.5) and next unless lock_expired?(lock)
+ until key[:_lock].setnx(Time.now.to_f + 0.5)
+ next unless timestamp = key[:_lock].get
+ sleep(0.1) and next unless lock_expired?(timestamp)
- break unless lock = db.getset(key[:_lock], lock_timeout)
- break if lock_expired?(lock)
+ break unless timestamp = key[:_lock].getset(Time.now.to_f + 0.5)
+ break if lock_expired?(timestamp)
end
end
# Release the lock.
# @see Model#mutex
def unlock!
- db.del(key[:_lock])
+ key[:_lock].del
end
- def lock_timeout
- Time.now.to_f + 1
- end
-
- def lock_expired? lock
- lock.to_f < Time.now.to_f
+ def lock_expired? timestamp
+ timestamp.to_f < Time.now.to_f
end
end
end