lib/ohm.rb in ohm-1.4.0 vs lib/ohm.rb in ohm-2.0.0.alpha1

- old
+ new

@@ -1,15 +1,17 @@ # encoding: UTF-8 -require "nest" -require "redis" +require "msgpack" +require "nido" +require "redic" require "securerandom" -require "scrivener" -require "ohm/transaction" require "ohm/command" module Ohm + LUA_CACHE = Hash.new { |h, k| h[k] = Hash.new } + LUA_SAVE = File.expand_path("../ohm/lua/save.lua", __FILE__) + LUA_DELETE = File.expand_path("../ohm/lua/delete.lua", __FILE__) # All of the known errors in Ohm can be traced back to one of these # exceptions. # # MissingID: @@ -66,80 +68,46 @@ when Symbol then context.const_get(name) else name end end - if Redis::VERSION >= "3.0.0" - def self.dict(dict) - dict - end - else - def self.dict(arr) - Hash[*arr] - end + def self.dict(arr) + Hash[*arr] end - end - class Connection - attr_accessor :context - attr_accessor :options + def self.sort(redis, key, options) + args = [] - def initialize(context = :main, options = {}) - @context = context - @options = options - end + args.concat(["BY", options[:by]]) if options[:by] + args.concat(["GET", options[:get]]) if options[:get] + args.concat(["LIMIT"] + options[:limit]) if options[:limit] + args.concat(options[:order].split(" ")) if options[:order] + args.concat(["STORE", options[:store]]) if options[:store] - def reset! - threaded[context] = nil + redis.call("SORT", key, *args) end - - def start(options = {}) - self.options = options - self.reset! - end - - def redis - threaded[context] ||= Redis.connect(options) - end - - def threaded - Thread.current[:ohm] ||= {} - end end - def self.conn - @conn ||= Connection.new - end - - # Stores the connection options for the Redis instance. - # - # Examples: - # - # Ohm.connect(:port => 6380, :db => 1, :host => "10.0.1.1") - # Ohm.connect(:url => "redis://10.0.1.1:6380/1") - # - # All of the options are simply passed on to `Redis.connect`. - # - def self.connect(options = {}) - conn.start(options) - end - # Use this if you want to do quick ad hoc redis commands against the # defined Ohm connection. # # Examples: # - # Ohm.redis.keys("User:*") - # Ohm.redis.set("foo", "bar") + # Ohm.redis.call("SET", "foo", "bar") + # Ohm.redis.call("FLUSH") # def self.redis - conn.redis + @redis ||= Redic.new end - # Wrapper for Ohm.redis.flushdb. + def self.redis=(redis) + @redis = redis + end + + # Wrapper for Ohm.redis.call("FLUSHDB"). def self.flush - redis.flushdb + redis.call("FLUSHDB") end module Collection include Enumerable @@ -162,23 +130,23 @@ size == 0 end # Wraps the whole pipelining functionality. def fetch(ids) - arr = db.pipelined do - ids.each { |id| db.hgetall(namespace[id]) } + ids.each do |id| + redis.queue("HGETALL", namespace[id]) end - res = [] + data = redis.commit - return res if arr.nil? + return [] if data.nil? - arr.each_with_index do |atts, idx| - res << model.new(Utils.dict(atts).update(:id => ids[idx])) + [].tap do |result| + data.each_with_index do |atts, idx| + result << model.new(Utils.dict(atts).update(:id => ids[idx])) + end end - - res end end class List include Collection @@ -193,22 +161,22 @@ @model = model end # Returns the total size of the list using LLEN. def size - db.llen(key) + redis.call("LLEN", key) end alias :count :size # Returns the first element of the list using LINDEX. def first - model[db.lindex(key, 0)] + model[redis.call("LINDEX", key, 0)] end # Returns the last element of the list using LINDEX. def last - model[db.lindex(key, -1)] + model[redis.call("LINDEX", key, -1)] end # Checks if the model is part of this List. # # An important thing to note is that this method loads all of the @@ -237,24 +205,25 @@ # # => false # def replace(models) ids = models.map { |model| model.id } - model.db.multi do - db.del(key) - ids.each { |id| db.rpush(key, id) } - end + redis.queue("MULTI") + redis.queue("DEL", key) + ids.each { |id| redis.queue("RPUSH", key, id) } + redis.queue("EXEC") + redis.commit end # Pushes the model to the _end_ of the list using RPUSH. def push(model) - db.rpush(key, model.id) + redis.call("RPUSH", key, model.id) end # Pushes the model to the _beginning_ of the list using LPUSH. def unshift(model) - db.lpush(key, model.id) + redis.call("LPUSH", key, model.id) end # Delete a model from the list. # # Note: If your list contains the model multiple times, this method @@ -281,20 +250,20 @@ # # => true # def delete(model) # LREM key 0 <id> means remove all elements matching <id> # @see http://redis.io/commands/lrem - db.lrem(key, 0, model.id) + redis.call("LREM", key, 0, model.id) end private def ids - db.lrange(key, 0, -1) + redis.call("LRANGE", key, 0, -1) end - def db - model.db + def redis + model.redis end end # Defines most of the methods used by `Set` and `MultiSet`. class BasicSet @@ -344,14 +313,14 @@ # # => true # def sort(options = {}) if options.has_key?(:get) options[:get] = to_key(options[:get]) - return execute { |key| db.sort(key, options) } + return execute { |key| Utils.sort(redis, key, options) } end - fetch(execute { |key| db.sort(key, options) }) + fetch(execute { |key| Utils.sort(redis, key, options) }) end # Check if a model is included in this set. # # Example: @@ -368,11 +337,11 @@ exists?(model.id) end # Returns the total size of the set using SCARD. def size - execute { |key| db.scard(key) } + execute { |key| redis.call("SCARD", key) } end alias :count :size # Syntactic sugar for `sort_by` or `sort` when you only need the # first element. @@ -396,11 +365,11 @@ end end # Grab all the elements of this set using SMEMBERS. def ids - execute { |key| db.smembers(key) } + execute { |key| redis.call("SMEMBERS", key) } end # Retrieve a specific element using an ID from this set. # # Example: @@ -415,11 +384,11 @@ model[id] if exists?(id) end private def exists?(id) - execute { |key| db.sismember(key, id) } + execute { |key| redis.call("SISMEMBER", key, id) == 1 } end def to_key(att) if model.counters.include?(att) namespace["*:counters->%s" % att] @@ -465,24 +434,10 @@ # def except(dict) MultiSet.new(namespace, model, key).except(dict) end - # Perform an intersection between the existent set and - # the new set created by the union of the passed filters. - # - # Example: - # - # set = User.find(:status => "active") - # set.combine(:name => ["John", "Jane"]) - # - # # The result will include all users with active status - # # and with names "John" or "Jane". - def combine(dict) - MultiSet.new(namespace, model, key).combine(dict) - end - # Do a union to the existing set using any number of filters. # # Example: # # set = User.find(:name => "John") @@ -498,12 +453,12 @@ private def execute yield key end - def db - model.db + def redis + model.redis end end class MutableSet < Set # Add a model directly to the set. @@ -514,11 +469,11 @@ # post = Post.create # # user.posts.add(post) # def add(model) - db.sadd(key, model.id) + redis.call("SADD", key, model.id) end alias_method :<<, :add # Remove a model directly from the set. @@ -529,11 +484,11 @@ # post = Post.create # # user.posts.delete(post) # def delete(model) - db.srem(key, model.id) + redis.call("SREM", key, model.id) end # Replace all the existing elements of a set with a different # collection of models. This happens atomically in a MULTI-EXEC # block. @@ -551,14 +506,15 @@ # # => false # def replace(models) ids = models.map { |model| model.id } - key.redis.multi do - db.del(key) - ids.each { |id| db.sadd(key, id) } - end + redis.queue("MULTI") + redis.queue("DEL", key) + ids.each { |id| redis.queue("SADD", key, id) } + redis.queue("EXEC") + redis.commit end end # Anytime you filter a set with more than one requirement, you # internally use a `MultiSet`. `MutiSet` is a bit slower than just @@ -610,30 +566,14 @@ # # You can also do it in one line. # User.find(:name => "John").except(:country => "US") # def except(dict) MultiSet.new( - namespace, model, Command[:sdiffstore, command, unioned(dict)] + namespace, model, Command[:sdiffstore, command, intersected(dict)] ) end - # Perform an intersection between the existent set and - # the new set created by the union of the passed filters. - # - # Example: - # - # set = User.find(:status => "active") - # set.combine(:name => ["John", "Jane"]) - # - # # The result will include all users with active status - # # and with names "John" or "Jane". - def combine(dict) - MultiSet.new( - namespace, model, Command[:sinterstore, command, unioned(dict)] - ) - end - # Do a union to the existing set using any number of filters. # # Example: # # set = User.find(:name => "John") @@ -647,34 +587,29 @@ namespace, model, Command[:sunionstore, command, intersected(dict)] ) end private - def db - model.db + def redis + model.redis end def intersected(dict) Command[:sinterstore, *model.filters(dict)] end - - def unioned(dict) - Command[:sunionstore, *model.filters(dict)] - end - def execute # namespace[:tmp] is where all the temp keys should be stored in. - # db will be where all the commands are executed against. - res = command.call(namespace[:tmp], db) + # redis will be where all the commands are executed against. + response = command.call(namespace[:tmp], redis) begin # At this point, we have the final aggregated set, which we yield # to the caller. the caller can do all the normal set operations, # i.e. SCARD, SMEMBERS, etc. - yield res + yield response ensure # We have to make sure we clean up the temporary keys to avoid # memory leaks and the unintended explosion of memory usage. @@ -731,48 +666,36 @@ # (For brevity, let's assume the Post created has an ID of 1). # # SADD User:1:posts 1 # class Model - include Scrivener::Validations - - def self.conn - @conn ||= Connection.new(name, Ohm.conn.options) + def self.redis=(redis) + @redis = redis end - def self.connect(options) - @key = nil - @lua = nil - conn.start(options) + def self.redis + @redis ||= Redic.new(Ohm.redis.url) end - def self.db - conn.redis - end - - def self.lua - @lua ||= Lua.new(File.join(Dir.pwd, "lua"), db) - end - # The namespace for all the keys generated using this model. # # Example: # # class User < Ohm::Model # # User.key == "User" # User.key.kind_of?(String) # # => true # - # User.key.kind_of?(Nest) + # User.key.kind_of?(Nido) # # => true # - # To find out more about Nest, see: - # http://github.com/soveran/nest + # To find out more about Nido, see: + # http://github.com/soveran/nido # def self.key - @key ||= Nest.new(self.name, db) + @key ||= Nido.new(self.name) end # Retrieve a record by ID. # # Example: @@ -801,11 +724,11 @@ lambda { |id| self[id] } end # Check if the ID exists within <Model>:all. def self.exists?(id) - db.sismember(key[:all], id) + redis.call("SISMEMBER", key[:all], id) == 1 end # Find values in `unique` indices. # # Example: @@ -819,11 +742,11 @@ # # => true # def self.with(att, val) raise IndexNotFound unless uniques.include?(att) - id = db.hget(key[:uniques][att], val) + id = redis.call("HGET", key[:uniques][att], val) new(:id => id).load! if id end # Find values in indexed fields. # @@ -1102,11 +1025,11 @@ counters << name unless counters.include?(name) define_method(name) do return 0 if new? - db.hget(key[:counters], name).to_i + redis.call("HGET", key[:counters], name).to_i end end # An Ohm::Set wrapper for Model.key[:all]. def self.all @@ -1180,11 +1103,11 @@ end # Preload all the attributes of this model from Redis. Used # internally by `Model::[]`. def load! - update_attributes(db.hgetall(key)) unless new? + update_attributes(Utils.dict(redis.call("HGETALL", key))) unless new? return self end # Read an attribute remotely from Redis. Useful if you want to get # the most recent value of the attribute and not rely on locally @@ -1201,31 +1124,36 @@ # u.save | # | u.name == "A" # | u.get(:name) == "B" # def get(att) - @attributes[att] = db.hget(key, att) + @attributes[att] = redis.call("HGET", key, att) end # Update an attribute value atomically. The best usecase for this # is when you simply want to update one value. # # Note: This method is dangerous because it doesn't update indices # and uniques. Use it wisely. The safe equivalent is `update`. # def set(att, val) - val.to_s.empty? ? db.hdel(key, att) : db.hset(key, att, val) + if val.to_s.empty? + redis.call("HDEL", key, att) + else + redis.call("HSET", key, att, val) + end + @attributes[att] = val end def new? !defined?(@id) end # Increment a counter atomically. Internally uses HINCRBY. def incr(att, count = 1) - db.hincrby(key[:counters], att, count) + redis.call("HINCRBY", key[:counters], att, count) end # Decrement a counter atomically. Internally uses HINCRBY. def decr(att, count = 1) incr(att, -count) @@ -1250,12 +1178,12 @@ def attributes @attributes end - # Export the ID and the errors of the model. The approach of Ohm - # is to whitelist public attributes, as opposed to exporting each + # Export the ID of the model. The approach of Ohm is to + # whitelist public attributes, as opposed to exporting each # (possibly sensitive) attribute. # # Example: # # class User < Ohm::Model @@ -1281,95 +1209,60 @@ # # => { :id => "1", :name => "John" } # def to_hash attrs = {} attrs[:id] = id unless new? - attrs[:errors] = errors if errors.any? return attrs end + # Persist the model attributes and update indices and unique # indices. The `counter`s and `set`s are not touched during save. # - # If the model is not valid, nil is returned. Otherwise, the - # persisted model is returned. - # # Example: # # class User < Ohm::Model # attribute :name - # - # def validate - # assert_present :name - # end # end # - # User.new(:name => nil).save - # # => nil - # # u = User.new(:name => "John").save # u.kind_of?(User) # # => true # - def save(&block) - return if not valid? - save!(&block) - end + def save + indices = {} + model.indices.each { |field| indices[field] = Array(send(field)) } - # Saves the model without checking for validity. Refer to - # `Model#save` for more details. - def save! - t = __save__ - yield t if block_given? - t.commit(db) + uniques = {} + model.uniques.each { |field| uniques[field] = send(field) } - return self - end + _initialize_id if new? - def __save__ - Transaction.new do |t| - t.watch(*_unique_keys) + attrs = attributes.delete_if do |k, v| + v.nil? + end - if not new? - t.watch(key) - t.watch(key[:_indices]) if model.indices.any? - t.watch(key[:_uniques]) if model.uniques.any? - end + response = script(LUA_SAVE, 0, + { "name" => model.name, + "id" => id, + "key" => key + }.to_msgpack, + attrs.flatten.to_msgpack, + indices.to_msgpack, + uniques.to_msgpack + ) - t.before do - _initialize_id if new? + if response.is_a?(RuntimeError) + if response.message =~ /(UniqueIndexViolation: (\w+))/ + raise UniqueIndexViolation, $1 + else + raise response end - - _uniques = nil - uniques = nil - _indices = nil - indices = nil - existing_indices = nil - existing_uniques = nil - - t.read do - _verify_uniques - existing_indices = _read_attributes(model.indices) if model.indices.any? - existing_uniques = _read_attributes(model.uniques) if model.uniques.any? - _uniques = db.hgetall(key[:_uniques]) - _indices = db.smembers(key[:_indices]) - uniques = _read_index_type(:uniques) - indices = _read_index_type(:indices) - end - - t.write do - db.sadd(model.key[:all], id) - _delete_existing_indices(existing_indices) - _delete_existing_uniques(existing_uniques) - _delete_indices(_indices) - _delete_uniques(_uniques) - _save - _save_indices(indices) - _save_uniques(uniques) - end end + + return self end # Delete the model, including all the following keys: # # - <Model>:<id> @@ -1377,42 +1270,40 @@ # - <Model>:<id>:<set name> # # If the model has uniques or indices, they're also cleaned up. # def delete - transaction do |t| - _uniques = nil - _indices = nil - existing_indices = nil - existing_uniques = nil + uniques = {} + model.uniques.each { |field| uniques[field] = send(field) } - t.watch(*_unique_keys) + script(LUA_DELETE, 0, + { "name" => model.name, + "id" => id, + "key" => key + }.to_msgpack, + uniques.to_msgpack, + model.collections.to_msgpack + ) - t.watch(key) - t.watch(key[:_indices]) if model.indices.any? - t.watch(key[:_uniques]) if model.uniques.any? + return self + end - t.read do - existing_indices = _read_attributes(model.indices) if model.indices.any? - existing_uniques = _read_attributes(model.uniques) if model.uniques.any? - _uniques = db.hgetall(key[:_uniques]) - _indices = db.smembers(key[:_indices]) - end + # Run lua scripts and cache the sha in order to improve + # successive calls. + def script(file, *args) + cache = LUA_CACHE[redis.url] - t.write do - _delete_uniques(_uniques) - _delete_indices(_indices) - _delete_existing_uniques(existing_uniques) - _delete_existing_indices(existing_indices) - model.collections.each { |e| db.del(key[e]) } - db.srem(model.key[:all], id) - db.del(key[:counters]) - db.del(key) - end + if cache.key?(file) + sha = cache[file] + else + src = File.read(file) + sha = redis.call("SCRIPT", "LOAD", src) - yield t if block_given? + cache[file] = sha end + + redis.call("EVALSHA", sha, *args) end # Update the model attributes and call save. # # Example: @@ -1482,127 +1373,23 @@ [key[:indices][att][val]] end end def self.new_id - db.incr(key[:id]) + redis.call("INCR", key[:id]) end attr_writer :id - def transaction - txn = Transaction.new { |t| yield t } - txn.commit(db) - end - def model self.class end - def db - model.db + def redis + model.redis end def _initialize_id @id = model.new_id.to_s - end - - def _skip_empty(atts) - {}.tap do |ret| - atts.each do |att, val| - ret[att] = send(att).to_s unless val.to_s.empty? - end - - throw :empty if ret.empty? - end - end - - def _unique_keys - model.uniques.map { |att| model.key[:uniques][att] } - end - - def _save - catch :empty do - db.del(key) - db.hmset(key, *_skip_empty(attributes).to_a.flatten) - end - end - - def _verify_uniques - if att = _detect_duplicate - raise UniqueIndexViolation, "#{att} is not unique." - end - end - - def _detect_duplicate - model.uniques.detect do |att| - id = db.hget(model.key[:uniques][att], send(att)) - id && id != self.id.to_s - end - end - - def _read_index_type(type) - {}.tap do |ret| - model.send(type).each do |att| - ret[att] = send(att) - end - end - end - - def _save_uniques(uniques) - attrs = model.attributes - - uniques.each do |att, val| - unique = model.key[:uniques][att] - - db.hset(unique, val, id) - db.hset(key[:_uniques], unique, val) unless attrs.include?(att) - end - end - - def _delete_uniques(uniques) - uniques.each do |unique, val| - db.hdel(unique, val) - db.hdel(key[:_uniques], unique) - end - end - - def _delete_existing_indices(existing) - return unless existing - - existing = existing.map { |key, value| model.to_indices(key, value) } - existing.flatten!(1) - - _delete_indices(existing) - end - - def _delete_existing_uniques(existing) - return unless existing - - _delete_uniques(existing.map { |key, value| - [model.key[:uniques][key], value] - }) - end - - def _delete_indices(indices) - indices.each do |index| - db.srem(index, id) - db.srem(key[:_indices], index) - end - end - - def _save_indices(indices) - attrs = model.attributes - - indices.each do |att, val| - model.to_indices(att, val).each do |index| - db.sadd(index, id) - db.sadd(key[:_indices], index) unless attrs.include?(att) - end - end - end - - def _read_attributes(attrs) - Hash[attrs.zip(db.hmget(key, *attrs))] end end end