lib/ohm.rb in ohm-0.1.0.rc5 vs lib/ohm.rb in ohm-0.1.0.rc6

- old
+ new

@@ -1,27 +1,28 @@ # encoding: UTF-8 require "base64" require "redis" +require "nest" require File.join(File.dirname(__FILE__), "ohm", "pattern") require File.join(File.dirname(__FILE__), "ohm", "validations") require File.join(File.dirname(__FILE__), "ohm", "compat-1.8.6") require File.join(File.dirname(__FILE__), "ohm", "key") module Ohm # Provides access to the Redis database. This is shared accross all models and instances. - def redis + def self.redis threaded[:redis] ||= connection(*options) end - def redis=(connection) + def self.redis=(connection) threaded[:redis] = connection end - def threaded + def self.threaded Thread.current[:ohm] ||= {} end # Connect to a redis database. # @@ -30,36 +31,34 @@ # @option options [#to_s] :port (6379) Port number. # @option options [#to_s] :db (0) Database number. # @option options [#to_s] :timeout (0) Database timeout in seconds. # @example Connect to a database in port 6380. # Ohm.connect(:port => 6380) - def connect(*options) + def self.connect(*options) self.redis = nil @options = options end # Return a connection to Redis. # - # This is a wapper around Redis.new(options) - def connection(*options) - Redis.new(*options) + # This is a wapper around Redis.connect(options) + def self.connection(*options) + Redis.connect(*options) end - def options + def self.options @options = [] unless defined? @options @options end # Clear the database. - def flush + def self.flush redis.flushdb end - module_function :connect, :connection, :flush, :redis, :redis=, :options, :threaded + class Error < StandardError; end - Error = Class.new(StandardError) - class Model # Wraps a model name for lazy evaluation. class Wrapper < BasicObject def initialize(name, &block) @@ -104,24 +103,18 @@ def add(model) self << model end - def first(options = {}) - if options[:by] - sort_by(options.delete(:by), options.merge(:limit => 1)).first - else - model[key.first(options)] - end - end + def sort(_options = {}) + return [] unless key.exists - def [](index) - model[key[index]] - end + options = _options.dup + options[:start] ||= 0 + options[:limit] = [options[:start], options[:limit]] if options[:limit] - def sort(*args) - key.sort(*args).map(&model) + key.sort(options).map(&model) end # Sort the model instances by the given attribute. # # @example Sorting elements by name: @@ -130,32 +123,32 @@ # User.create :name => "A" # # user = User.all.sort_by(:name, :order => "ALPHA").first # user.name == "A" # # => true - def sort_by(att, options = {}) - options.merge!(:by => model.key("*->#{att}")) + def sort_by(att, _options = {}) + return [] unless key.exists + options = _options.dup + options.merge!(:by => model.key["*->#{att}"]) + if options[:get] - key.sort(options.merge(:get => model.key("*->#{options[:get]}"))) + key.sort(options.merge(:get => model.key["*->#{options[:get]}"])) else sort(options) end end def clear key.del end - def concat(models) - models.each { |model| add(model) } - self - end - def replace(models) - clear - concat(models) + model.db.multi do + clear + models.each { |model| add(model) } + end end def empty? !key.exists end @@ -202,42 +195,12 @@ source = keys(options) target = source.inject(key.volatile) { |chain, other| chain - other } apply(:sdiffstore, key, source, target) end - def sort(options = {}) - return [] unless key.exists - - options[:start] ||= 0 - options[:limit] = [options[:start], options[:limit]] if options[:limit] - - key.sort(options).map(&model) - end - - # Sort the model instances by the given attribute. - # - # @example Sorting elements by name: - # - # User.create :name => "B" - # User.create :name => "A" - # - # user = User.all.sort_by(:name, :order => "ALPHA").first - # user.name == "A" - # # => true - def sort_by(att, options = {}) - return [] unless key.exists - - options.merge!(:by => model.key["*->#{att}"]) - - if options[:get] - key.sort(options.merge(:get => model.key["*->#{options[:get]}"])) - else - sort(options) - end - end - - def first(options = {}) + def first(_options = {}) + options = _options.dup options.merge!(:limit => 1) if options[:by] sort_by(options.delete(:by), options).first else @@ -312,17 +275,15 @@ def first self[0] end def pop - id = key.rpop - model[id] if id + model[key.rpop] end def shift - id = key.lpop - model[id] if id + model[key.lpop] end def unshift(model) key.lpush(model.id) end @@ -568,11 +529,11 @@ @_memo[name] ||= instance_eval(&block) end end def self.[](id) - new(:id => id) if exists?(id) + new(:id => id) if id && exists?(id) end def self.to_proc Proc.new { |id| self[id] } end @@ -673,11 +634,11 @@ # Increment the counter denoted by :att. # # @param att [Symbol] Attribute to increment. def incr(att, count = 1) raise ArgumentError, "#{att.inspect} is not a counter." unless counters.include?(att) - write_local(att, db.hincrby(key, att, count)) + write_local(att, key.hincrby(att, count)) end # Decrement the counter denoted by :att. # # @param att [Symbol] Attribute to decrement. @@ -805,28 +766,28 @@ def write unless (attributes + counters).empty? atts = (attributes + counters).inject([]) { |ret, att| value = send(att).to_s - ret.push(att, value) if not value.empty? + ret.push(att, value) if not value.empty? ret } db.multi do - db.del(key) - db.hmset(key, *atts.flatten) if atts.any? + key.del + key.hmset(*atts.flatten) if atts.any? end end end def write_remote(att, value) write_local(att, value) if value.to_s.empty? - db.hdel(key, att) + key.hdel(att) else - db.hset(key, att, value) + key.hset(att, value) end end def self.const_missing(name) wrapper = Wrapper.new(name) { const_get(name) } @@ -854,15 +815,15 @@ def self.key Key.new(self, db) end def self.exists?(id) - db.sismember(key[:all], id) + key[:all].sismember(id) end def initialize_id - self.id = db.incr(self.class.key[:id]).to_s + @id ||= self.class.key[:id].incr.to_s end def db self.class.db end @@ -874,11 +835,11 @@ def create_model_membership self.class.all << self end def delete_model_membership - db.del(key) + key.del self.class.all.delete(self) end def update_indices delete_from_indices @@ -901,20 +862,20 @@ value.kind_of?(String) == false end def add_to_index(att, value = send(att)) index = index_key_for(att, value) - db.sadd(index, id) - db.sadd(key[:_indices], index) + index.sadd(id) + key[:_indices].sadd(index) end def delete_from_indices - db.smembers(key[:_indices]).each do |index| + key[:_indices].smembers.each do |index| db.srem(index, id) end - db.del(key[:_indices]) + key[:_indices].del end def read_local(att) @_attributes[att] end @@ -923,11 +884,11 @@ @_attributes[att] = value end def read_remote(att) unless new? - value = db.hget(key, att) + value = key.hget(att) value.respond_to?(:force_encoding) ? value.force_encoding("UTF-8") : value end end @@ -957,29 +918,25 @@ # This method implements the design pattern for locks # described at: http://code.google.com/p/redis/wiki/SetnxCommand # # @see Model#mutex def lock! - until db.setnx(key[:_lock], lock_timeout) - next unless lock = db.get(key[:_lock]) - sleep(0.5) and next unless lock_expired?(lock) + until key[:_lock].setnx(Time.now.to_f + 0.5) + next unless timestamp = key[:_lock].get + sleep(0.1) and next unless lock_expired?(timestamp) - break unless lock = db.getset(key[:_lock], lock_timeout) - break if lock_expired?(lock) + break unless timestamp = key[:_lock].getset(Time.now.to_f + 0.5) + break if lock_expired?(timestamp) end end # Release the lock. # @see Model#mutex def unlock! - db.del(key[:_lock]) + key[:_lock].del end - def lock_timeout - Time.now.to_f + 1 - end - - def lock_expired? lock - lock.to_f < Time.now.to_f + def lock_expired? timestamp + timestamp.to_f < Time.now.to_f end end end