lib/ohm.rb in ohm-1.4.0 vs lib/ohm.rb in ohm-2.0.0.alpha1
- old
+ new
@@ -1,15 +1,17 @@
# encoding: UTF-8
-require "nest"
-require "redis"
+require "msgpack"
+require "nido"
+require "redic"
require "securerandom"
-require "scrivener"
-require "ohm/transaction"
require "ohm/command"
module Ohm
+ LUA_CACHE = Hash.new { |h, k| h[k] = Hash.new }
+ LUA_SAVE = File.expand_path("../ohm/lua/save.lua", __FILE__)
+ LUA_DELETE = File.expand_path("../ohm/lua/delete.lua", __FILE__)
# All of the known errors in Ohm can be traced back to one of these
# exceptions.
#
# MissingID:
@@ -66,80 +68,46 @@
when Symbol then context.const_get(name)
else name
end
end
- if Redis::VERSION >= "3.0.0"
- def self.dict(dict)
- dict
- end
- else
- def self.dict(arr)
- Hash[*arr]
- end
+ def self.dict(arr)
+ Hash[*arr]
end
- end
- class Connection
- attr_accessor :context
- attr_accessor :options
+ def self.sort(redis, key, options)
+ args = []
- def initialize(context = :main, options = {})
- @context = context
- @options = options
- end
+ args.concat(["BY", options[:by]]) if options[:by]
+ args.concat(["GET", options[:get]]) if options[:get]
+ args.concat(["LIMIT"] + options[:limit]) if options[:limit]
+ args.concat(options[:order].split(" ")) if options[:order]
+ args.concat(["STORE", options[:store]]) if options[:store]
- def reset!
- threaded[context] = nil
+ redis.call("SORT", key, *args)
end
-
- def start(options = {})
- self.options = options
- self.reset!
- end
-
- def redis
- threaded[context] ||= Redis.connect(options)
- end
-
- def threaded
- Thread.current[:ohm] ||= {}
- end
end
- def self.conn
- @conn ||= Connection.new
- end
-
- # Stores the connection options for the Redis instance.
- #
- # Examples:
- #
- # Ohm.connect(:port => 6380, :db => 1, :host => "10.0.1.1")
- # Ohm.connect(:url => "redis://10.0.1.1:6380/1")
- #
- # All of the options are simply passed on to `Redis.connect`.
- #
- def self.connect(options = {})
- conn.start(options)
- end
-
# Use this if you want to do quick ad hoc redis commands against the
# defined Ohm connection.
#
# Examples:
#
- # Ohm.redis.keys("User:*")
- # Ohm.redis.set("foo", "bar")
+ # Ohm.redis.call("SET", "foo", "bar")
+ # Ohm.redis.call("FLUSH")
#
def self.redis
- conn.redis
+ @redis ||= Redic.new
end
- # Wrapper for Ohm.redis.flushdb.
+ def self.redis=(redis)
+ @redis = redis
+ end
+
+ # Wrapper for Ohm.redis.call("FLUSHDB").
def self.flush
- redis.flushdb
+ redis.call("FLUSHDB")
end
module Collection
include Enumerable
@@ -162,23 +130,23 @@
size == 0
end
# Wraps the whole pipelining functionality.
def fetch(ids)
- arr = db.pipelined do
- ids.each { |id| db.hgetall(namespace[id]) }
+ ids.each do |id|
+ redis.queue("HGETALL", namespace[id])
end
- res = []
+ data = redis.commit
- return res if arr.nil?
+ return [] if data.nil?
- arr.each_with_index do |atts, idx|
- res << model.new(Utils.dict(atts).update(:id => ids[idx]))
+ [].tap do |result|
+ data.each_with_index do |atts, idx|
+ result << model.new(Utils.dict(atts).update(:id => ids[idx]))
+ end
end
-
- res
end
end
class List
include Collection
@@ -193,22 +161,22 @@
@model = model
end
# Returns the total size of the list using LLEN.
def size
- db.llen(key)
+ redis.call("LLEN", key)
end
alias :count :size
# Returns the first element of the list using LINDEX.
def first
- model[db.lindex(key, 0)]
+ model[redis.call("LINDEX", key, 0)]
end
# Returns the last element of the list using LINDEX.
def last
- model[db.lindex(key, -1)]
+ model[redis.call("LINDEX", key, -1)]
end
# Checks if the model is part of this List.
#
# An important thing to note is that this method loads all of the
@@ -237,24 +205,25 @@
# # => false
#
def replace(models)
ids = models.map { |model| model.id }
- model.db.multi do
- db.del(key)
- ids.each { |id| db.rpush(key, id) }
- end
+ redis.queue("MULTI")
+ redis.queue("DEL", key)
+ ids.each { |id| redis.queue("RPUSH", key, id) }
+ redis.queue("EXEC")
+ redis.commit
end
# Pushes the model to the _end_ of the list using RPUSH.
def push(model)
- db.rpush(key, model.id)
+ redis.call("RPUSH", key, model.id)
end
# Pushes the model to the _beginning_ of the list using LPUSH.
def unshift(model)
- db.lpush(key, model.id)
+ redis.call("LPUSH", key, model.id)
end
# Delete a model from the list.
#
# Note: If your list contains the model multiple times, this method
@@ -281,20 +250,20 @@
# # => true
#
def delete(model)
# LREM key 0 <id> means remove all elements matching <id>
# @see http://redis.io/commands/lrem
- db.lrem(key, 0, model.id)
+ redis.call("LREM", key, 0, model.id)
end
private
def ids
- db.lrange(key, 0, -1)
+ redis.call("LRANGE", key, 0, -1)
end
- def db
- model.db
+ def redis
+ model.redis
end
end
# Defines most of the methods used by `Set` and `MultiSet`.
class BasicSet
@@ -344,14 +313,14 @@
# # => true
#
def sort(options = {})
if options.has_key?(:get)
options[:get] = to_key(options[:get])
- return execute { |key| db.sort(key, options) }
+ return execute { |key| Utils.sort(redis, key, options) }
end
- fetch(execute { |key| db.sort(key, options) })
+ fetch(execute { |key| Utils.sort(redis, key, options) })
end
# Check if a model is included in this set.
#
# Example:
@@ -368,11 +337,11 @@
exists?(model.id)
end
# Returns the total size of the set using SCARD.
def size
- execute { |key| db.scard(key) }
+ execute { |key| redis.call("SCARD", key) }
end
alias :count :size
# Syntactic sugar for `sort_by` or `sort` when you only need the
# first element.
@@ -396,11 +365,11 @@
end
end
# Grab all the elements of this set using SMEMBERS.
def ids
- execute { |key| db.smembers(key) }
+ execute { |key| redis.call("SMEMBERS", key) }
end
# Retrieve a specific element using an ID from this set.
#
# Example:
@@ -415,11 +384,11 @@
model[id] if exists?(id)
end
private
def exists?(id)
- execute { |key| db.sismember(key, id) }
+ execute { |key| redis.call("SISMEMBER", key, id) == 1 }
end
def to_key(att)
if model.counters.include?(att)
namespace["*:counters->%s" % att]
@@ -465,24 +434,10 @@
#
def except(dict)
MultiSet.new(namespace, model, key).except(dict)
end
- # Perform an intersection between the existent set and
- # the new set created by the union of the passed filters.
- #
- # Example:
- #
- # set = User.find(:status => "active")
- # set.combine(:name => ["John", "Jane"])
- #
- # # The result will include all users with active status
- # # and with names "John" or "Jane".
- def combine(dict)
- MultiSet.new(namespace, model, key).combine(dict)
- end
-
# Do a union to the existing set using any number of filters.
#
# Example:
#
# set = User.find(:name => "John")
@@ -498,12 +453,12 @@
private
def execute
yield key
end
- def db
- model.db
+ def redis
+ model.redis
end
end
class MutableSet < Set
# Add a model directly to the set.
@@ -514,11 +469,11 @@
# post = Post.create
#
# user.posts.add(post)
#
def add(model)
- db.sadd(key, model.id)
+ redis.call("SADD", key, model.id)
end
alias_method :<<, :add
# Remove a model directly from the set.
@@ -529,11 +484,11 @@
# post = Post.create
#
# user.posts.delete(post)
#
def delete(model)
- db.srem(key, model.id)
+ redis.call("SREM", key, model.id)
end
# Replace all the existing elements of a set with a different
# collection of models. This happens atomically in a MULTI-EXEC
# block.
@@ -551,14 +506,15 @@
# # => false
#
def replace(models)
ids = models.map { |model| model.id }
- key.redis.multi do
- db.del(key)
- ids.each { |id| db.sadd(key, id) }
- end
+ redis.queue("MULTI")
+ redis.queue("DEL", key)
+ ids.each { |id| redis.queue("SADD", key, id) }
+ redis.queue("EXEC")
+ redis.commit
end
end
# Anytime you filter a set with more than one requirement, you
# internally use a `MultiSet`. `MutiSet` is a bit slower than just
@@ -610,30 +566,14 @@
# # You can also do it in one line.
# User.find(:name => "John").except(:country => "US")
#
def except(dict)
MultiSet.new(
- namespace, model, Command[:sdiffstore, command, unioned(dict)]
+ namespace, model, Command[:sdiffstore, command, intersected(dict)]
)
end
- # Perform an intersection between the existent set and
- # the new set created by the union of the passed filters.
- #
- # Example:
- #
- # set = User.find(:status => "active")
- # set.combine(:name => ["John", "Jane"])
- #
- # # The result will include all users with active status
- # # and with names "John" or "Jane".
- def combine(dict)
- MultiSet.new(
- namespace, model, Command[:sinterstore, command, unioned(dict)]
- )
- end
-
# Do a union to the existing set using any number of filters.
#
# Example:
#
# set = User.find(:name => "John")
@@ -647,34 +587,29 @@
namespace, model, Command[:sunionstore, command, intersected(dict)]
)
end
private
- def db
- model.db
+ def redis
+ model.redis
end
def intersected(dict)
Command[:sinterstore, *model.filters(dict)]
end
-
- def unioned(dict)
- Command[:sunionstore, *model.filters(dict)]
- end
-
def execute
# namespace[:tmp] is where all the temp keys should be stored in.
- # db will be where all the commands are executed against.
- res = command.call(namespace[:tmp], db)
+ # redis will be where all the commands are executed against.
+ response = command.call(namespace[:tmp], redis)
begin
# At this point, we have the final aggregated set, which we yield
# to the caller. the caller can do all the normal set operations,
# i.e. SCARD, SMEMBERS, etc.
- yield res
+ yield response
ensure
# We have to make sure we clean up the temporary keys to avoid
# memory leaks and the unintended explosion of memory usage.
@@ -731,48 +666,36 @@
# (For brevity, let's assume the Post created has an ID of 1).
#
# SADD User:1:posts 1
#
class Model
- include Scrivener::Validations
-
- def self.conn
- @conn ||= Connection.new(name, Ohm.conn.options)
+ def self.redis=(redis)
+ @redis = redis
end
- def self.connect(options)
- @key = nil
- @lua = nil
- conn.start(options)
+ def self.redis
+ @redis ||= Redic.new(Ohm.redis.url)
end
- def self.db
- conn.redis
- end
-
- def self.lua
- @lua ||= Lua.new(File.join(Dir.pwd, "lua"), db)
- end
-
# The namespace for all the keys generated using this model.
#
# Example:
#
# class User < Ohm::Model
#
# User.key == "User"
# User.key.kind_of?(String)
# # => true
#
- # User.key.kind_of?(Nest)
+ # User.key.kind_of?(Nido)
# # => true
#
- # To find out more about Nest, see:
- # http://github.com/soveran/nest
+ # To find out more about Nido, see:
+ # http://github.com/soveran/nido
#
def self.key
- @key ||= Nest.new(self.name, db)
+ @key ||= Nido.new(self.name)
end
# Retrieve a record by ID.
#
# Example:
@@ -801,11 +724,11 @@
lambda { |id| self[id] }
end
# Check if the ID exists within <Model>:all.
def self.exists?(id)
- db.sismember(key[:all], id)
+ redis.call("SISMEMBER", key[:all], id) == 1
end
# Find values in `unique` indices.
#
# Example:
@@ -819,11 +742,11 @@
# # => true
#
def self.with(att, val)
raise IndexNotFound unless uniques.include?(att)
- id = db.hget(key[:uniques][att], val)
+ id = redis.call("HGET", key[:uniques][att], val)
new(:id => id).load! if id
end
# Find values in indexed fields.
#
@@ -1102,11 +1025,11 @@
counters << name unless counters.include?(name)
define_method(name) do
return 0 if new?
- db.hget(key[:counters], name).to_i
+ redis.call("HGET", key[:counters], name).to_i
end
end
# An Ohm::Set wrapper for Model.key[:all].
def self.all
@@ -1180,11 +1103,11 @@
end
# Preload all the attributes of this model from Redis. Used
# internally by `Model::[]`.
def load!
- update_attributes(db.hgetall(key)) unless new?
+ update_attributes(Utils.dict(redis.call("HGETALL", key))) unless new?
return self
end
# Read an attribute remotely from Redis. Useful if you want to get
# the most recent value of the attribute and not rely on locally
@@ -1201,31 +1124,36 @@
# u.save |
# | u.name == "A"
# | u.get(:name) == "B"
#
def get(att)
- @attributes[att] = db.hget(key, att)
+ @attributes[att] = redis.call("HGET", key, att)
end
# Update an attribute value atomically. The best usecase for this
# is when you simply want to update one value.
#
# Note: This method is dangerous because it doesn't update indices
# and uniques. Use it wisely. The safe equivalent is `update`.
#
def set(att, val)
- val.to_s.empty? ? db.hdel(key, att) : db.hset(key, att, val)
+ if val.to_s.empty?
+ redis.call("HDEL", key, att)
+ else
+ redis.call("HSET", key, att, val)
+ end
+
@attributes[att] = val
end
def new?
!defined?(@id)
end
# Increment a counter atomically. Internally uses HINCRBY.
def incr(att, count = 1)
- db.hincrby(key[:counters], att, count)
+ redis.call("HINCRBY", key[:counters], att, count)
end
# Decrement a counter atomically. Internally uses HINCRBY.
def decr(att, count = 1)
incr(att, -count)
@@ -1250,12 +1178,12 @@
def attributes
@attributes
end
- # Export the ID and the errors of the model. The approach of Ohm
- # is to whitelist public attributes, as opposed to exporting each
+ # Export the ID of the model. The approach of Ohm is to
+ # whitelist public attributes, as opposed to exporting each
# (possibly sensitive) attribute.
#
# Example:
#
# class User < Ohm::Model
@@ -1281,95 +1209,60 @@
# # => { :id => "1", :name => "John" }
#
def to_hash
attrs = {}
attrs[:id] = id unless new?
- attrs[:errors] = errors if errors.any?
return attrs
end
+
# Persist the model attributes and update indices and unique
# indices. The `counter`s and `set`s are not touched during save.
#
- # If the model is not valid, nil is returned. Otherwise, the
- # persisted model is returned.
- #
# Example:
#
# class User < Ohm::Model
# attribute :name
- #
- # def validate
- # assert_present :name
- # end
# end
#
- # User.new(:name => nil).save
- # # => nil
- #
# u = User.new(:name => "John").save
# u.kind_of?(User)
# # => true
#
- def save(&block)
- return if not valid?
- save!(&block)
- end
+ def save
+ indices = {}
+ model.indices.each { |field| indices[field] = Array(send(field)) }
- # Saves the model without checking for validity. Refer to
- # `Model#save` for more details.
- def save!
- t = __save__
- yield t if block_given?
- t.commit(db)
+ uniques = {}
+ model.uniques.each { |field| uniques[field] = send(field) }
- return self
- end
+ _initialize_id if new?
- def __save__
- Transaction.new do |t|
- t.watch(*_unique_keys)
+ attrs = attributes.delete_if do |k, v|
+ v.nil?
+ end
- if not new?
- t.watch(key)
- t.watch(key[:_indices]) if model.indices.any?
- t.watch(key[:_uniques]) if model.uniques.any?
- end
+ response = script(LUA_SAVE, 0,
+ { "name" => model.name,
+ "id" => id,
+ "key" => key
+ }.to_msgpack,
+ attrs.flatten.to_msgpack,
+ indices.to_msgpack,
+ uniques.to_msgpack
+ )
- t.before do
- _initialize_id if new?
+ if response.is_a?(RuntimeError)
+ if response.message =~ /(UniqueIndexViolation: (\w+))/
+ raise UniqueIndexViolation, $1
+ else
+ raise response
end
-
- _uniques = nil
- uniques = nil
- _indices = nil
- indices = nil
- existing_indices = nil
- existing_uniques = nil
-
- t.read do
- _verify_uniques
- existing_indices = _read_attributes(model.indices) if model.indices.any?
- existing_uniques = _read_attributes(model.uniques) if model.uniques.any?
- _uniques = db.hgetall(key[:_uniques])
- _indices = db.smembers(key[:_indices])
- uniques = _read_index_type(:uniques)
- indices = _read_index_type(:indices)
- end
-
- t.write do
- db.sadd(model.key[:all], id)
- _delete_existing_indices(existing_indices)
- _delete_existing_uniques(existing_uniques)
- _delete_indices(_indices)
- _delete_uniques(_uniques)
- _save
- _save_indices(indices)
- _save_uniques(uniques)
- end
end
+
+ return self
end
# Delete the model, including all the following keys:
#
# - <Model>:<id>
@@ -1377,42 +1270,40 @@
# - <Model>:<id>:<set name>
#
# If the model has uniques or indices, they're also cleaned up.
#
def delete
- transaction do |t|
- _uniques = nil
- _indices = nil
- existing_indices = nil
- existing_uniques = nil
+ uniques = {}
+ model.uniques.each { |field| uniques[field] = send(field) }
- t.watch(*_unique_keys)
+ script(LUA_DELETE, 0,
+ { "name" => model.name,
+ "id" => id,
+ "key" => key
+ }.to_msgpack,
+ uniques.to_msgpack,
+ model.collections.to_msgpack
+ )
- t.watch(key)
- t.watch(key[:_indices]) if model.indices.any?
- t.watch(key[:_uniques]) if model.uniques.any?
+ return self
+ end
- t.read do
- existing_indices = _read_attributes(model.indices) if model.indices.any?
- existing_uniques = _read_attributes(model.uniques) if model.uniques.any?
- _uniques = db.hgetall(key[:_uniques])
- _indices = db.smembers(key[:_indices])
- end
+ # Run lua scripts and cache the sha in order to improve
+ # successive calls.
+ def script(file, *args)
+ cache = LUA_CACHE[redis.url]
- t.write do
- _delete_uniques(_uniques)
- _delete_indices(_indices)
- _delete_existing_uniques(existing_uniques)
- _delete_existing_indices(existing_indices)
- model.collections.each { |e| db.del(key[e]) }
- db.srem(model.key[:all], id)
- db.del(key[:counters])
- db.del(key)
- end
+ if cache.key?(file)
+ sha = cache[file]
+ else
+ src = File.read(file)
+ sha = redis.call("SCRIPT", "LOAD", src)
- yield t if block_given?
+ cache[file] = sha
end
+
+ redis.call("EVALSHA", sha, *args)
end
# Update the model attributes and call save.
#
# Example:
@@ -1482,127 +1373,23 @@
[key[:indices][att][val]]
end
end
def self.new_id
- db.incr(key[:id])
+ redis.call("INCR", key[:id])
end
attr_writer :id
- def transaction
- txn = Transaction.new { |t| yield t }
- txn.commit(db)
- end
-
def model
self.class
end
- def db
- model.db
+ def redis
+ model.redis
end
def _initialize_id
@id = model.new_id.to_s
- end
-
- def _skip_empty(atts)
- {}.tap do |ret|
- atts.each do |att, val|
- ret[att] = send(att).to_s unless val.to_s.empty?
- end
-
- throw :empty if ret.empty?
- end
- end
-
- def _unique_keys
- model.uniques.map { |att| model.key[:uniques][att] }
- end
-
- def _save
- catch :empty do
- db.del(key)
- db.hmset(key, *_skip_empty(attributes).to_a.flatten)
- end
- end
-
- def _verify_uniques
- if att = _detect_duplicate
- raise UniqueIndexViolation, "#{att} is not unique."
- end
- end
-
- def _detect_duplicate
- model.uniques.detect do |att|
- id = db.hget(model.key[:uniques][att], send(att))
- id && id != self.id.to_s
- end
- end
-
- def _read_index_type(type)
- {}.tap do |ret|
- model.send(type).each do |att|
- ret[att] = send(att)
- end
- end
- end
-
- def _save_uniques(uniques)
- attrs = model.attributes
-
- uniques.each do |att, val|
- unique = model.key[:uniques][att]
-
- db.hset(unique, val, id)
- db.hset(key[:_uniques], unique, val) unless attrs.include?(att)
- end
- end
-
- def _delete_uniques(uniques)
- uniques.each do |unique, val|
- db.hdel(unique, val)
- db.hdel(key[:_uniques], unique)
- end
- end
-
- def _delete_existing_indices(existing)
- return unless existing
-
- existing = existing.map { |key, value| model.to_indices(key, value) }
- existing.flatten!(1)
-
- _delete_indices(existing)
- end
-
- def _delete_existing_uniques(existing)
- return unless existing
-
- _delete_uniques(existing.map { |key, value|
- [model.key[:uniques][key], value]
- })
- end
-
- def _delete_indices(indices)
- indices.each do |index|
- db.srem(index, id)
- db.srem(key[:_indices], index)
- end
- end
-
- def _save_indices(indices)
- attrs = model.attributes
-
- indices.each do |att, val|
- model.to_indices(att, val).each do |index|
- db.sadd(index, id)
- db.sadd(key[:_indices], index) unless attrs.include?(att)
- end
- end
- end
-
- def _read_attributes(attrs)
- Hash[attrs.zip(db.hmget(key, *attrs))]
end
end
end