# encoding: UTF-8 require "base64" require File.join(File.dirname(__FILE__), "ohm", "redis") require File.join(File.dirname(__FILE__), "ohm", "validations") require File.join(File.dirname(__FILE__), "ohm", "compat-1.8.6") module Ohm # Provides access to the Redis database. This is shared accross all models and instances. def redis Thread.current[:redis] ||= Ohm::Redis.new(*options) end def redis=(connection) Thread.current[:redis] = connection end # Connect to a redis database. # # @param options [Hash] options to create a message with. # @option options [#to_s] :host ('127.0.0.1') Host of the redis database. # @option options [#to_s] :port (6379) Port number. # @option options [#to_s] :db (0) Database number. # @option options [#to_s] :timeout (0) Database timeout in seconds. # @example Connect to a database in port 6380. # Ohm.connect(:port => 6380) def connect(*options) self.redis = nil @options = options end def options @options end # Clear the database. def flush redis.flushdb end # Join the parameters with ":" to create a key. def key(*args) args.join(":") end module_function :key, :connect, :flush, :redis, :redis=, :options module Attributes class Collection include Enumerable attr_accessor :key, :db, :model def initialize(db, key, model = nil) self.db = db self.key = key self.model = model end def each(&block) all.each(&block) end # Return instances of model for all the ids contained in the collection. def all instantiate(raw) end # Return the values as model instances, ordered by the options supplied. # Check redis documentation to see what values you can provide to each option. # # @param options [Hash] options to sort the collection. # @option options [#to_s] :by Model attribute to sort the instances by. # @option options [#to_s] :order (ASC) Sorting order, which can be ASC or DESC. # @option options [Integer] :limit (all) Number of items to return. # @option options [Integer] :start (0) An offset from where the limit will be applied. # # @example Get the first ten users sorted alphabetically by name: # # @event.attendees.sort(:by => :name, :order => "ALPHA", :limit => 10) # # @example Get five posts sorted by number of votes and starting from the number 5 (zero based): # # @blog.posts.sort(:by => :votes, :start => 5, :limit => 10") def sort(options = {}) return [] if empty? options[:start] ||= 0 options[:limit] = [options[:start], options[:limit]] if options[:limit] result = db.sort(key, options) options[:get] ? result : instantiate(result) end # Sort the model instances by the given attribute. # # @example Sorting elements by name: # # User.create :name => "B" # User.create :name => "A" # # user = User.all.sort_by(:name, :order => "ALPHA").first # user.name == "A" # # => true def sort_by(att, options = {}) sort(options.merge(:by => model.key("*", att))) end # Sort the model instances by id and return the first instance # found. If a :by option is provided with a valid attribute name, the # method sort_by is used instead and the option provided is passed as the # first parameter. # # @see #sort # @see #sort_by # @return [Ohm::Model, nil] Returns the first instance found or nil. def first(options = {}) options = options.merge(:limit => 1) options[:by] ? sort_by(options.delete(:by), options).first : sort(options).first end def to_ary all end def ==(other) to_ary == other end # @return [true, false] Returns whether or not the collection is empty. def empty? size.zero? end # Clears the values in the collection. def clear db.del(key) self end # Appends the given values to the collection. def concat(values) values.each { |value| self << value } self end # Replaces the collection with the passed values. def replace(values) clear concat(values) end # @param value [Ohm::Model#id] Adds the id of the object if it's an Ohm::Model. def add(model) self << model.id end private def instantiate(raw) model ? raw.collect { |id| model[id] } : raw end end # Represents a Redis list. # # @example Use a list attribute. # # class Event < Ohm::Model # attribute :name # list :participants # end # # event = Event.create :name => "Redis Meeting" # event.participants << "Albert" # event.participants << "Benoit" # event.participants.all #=> ["Albert", "Benoit"] class List < Collection # @param value [#to_s] Pushes value to the tail of the list. def << value db.rpush(key, value) end alias push << # @return [String] Return and remove the last element of the list. def pop db.rpop(key) end # @return [String] Return and remove the first element of the list. def shift db.lpop(key) end # @param value [#to_s] Pushes value to the head of the list. def unshift(value) db.lpush(key, value) end # @return [Array] Elements of the list. def raw db.list(key) end # @return [Integer] Returns the number of elements in the list. def size db.llen(key) end def include?(value) raw.include?(value) end def inspect "#" end end # Represents a Redis set. # # @example Use a set attribute. # # class Company < Ohm::Model # attribute :name # set :employees # end # # company = Company.create :name => "Redis Co." # company.employees << "Albert" # company.employees << "Benoit" # company.employees.all #=> ["Albert", "Benoit"] # company.include?("Albert") #=> true class Set < Collection # @param value [#to_s] Adds value to the list. def << value db.sadd(key, value) end def delete(value) db.srem(key, value) end def include?(value) db.sismember(key, value) end def raw db.smembers(key) end # @return [Integer] Returns the number of elements in the set. def size db.scard(key) end def inspect "#" end # Returns an intersection with the sets generated from the passed hash. # # @see Ohm::Model.find # @example # @events = Event.find(public: true) # # # You can combine the result with sort and other set operations: # @events.sort_by(:name) def find(hash) apply(:sinterstore, hash, "+") end # Returns the difference between the receiver and the passed sets. # # @example # @events = Event.find(public: true).except(status: "sold_out") def except(hash) apply(:sdiffstore, hash, "-") end private # Apply a redis operation on a collection of sets. def apply(operation, hash, glue) indices = keys(hash).unshift(key).uniq target = indices.join(glue) db.send(operation, target, *indices) self.class.new(db, target, model) end # Transform a hash of attribute/values into an array of keys. def keys(hash) hash.inject([]) do |acc, t| acc + Array(t[1]).map do |v| model.index_key_for(t[0], v) end end end end class Index < Set def inspect "#" end def clear raise Ohm::Model::CannotDeleteIndex end end end Error = Class.new(StandardError) class Model module Validations include Ohm::Validations # Validates that the attribute or array of attributes are unique. For this, # an index of the same kind must exist. # # @overload assert_unique :name # Validates that the name attribute is unique. # @overload assert_unique [:street, :city] # Validates that the :street and :city pair is unique. def assert_unique(attrs) result = db.sinter(*Array(attrs).map { |att| index_key_for(att, send(att)) }) assert result.empty? || !new? && result.include?(id.to_s), [attrs, :not_unique] end end include Validations class MissingID < Error def message "You tried to perform an operation that needs the model ID, but it's not present." end end class CannotDeleteIndex < Error def message "You tried to delete an internal index used by Ohm." end end class IndexNotFound < Error def initialize(att) @att = att end def message "Index #{@att.inspect} not found." end end @@attributes = Hash.new { |hash, key| hash[key] = [] } @@collections = Hash.new { |hash, key| hash[key] = [] } @@counters = Hash.new { |hash, key| hash[key] = [] } @@indices = Hash.new { |hash, key| hash[key] = [] } attr_writer :id def id @id or raise MissingID end # Defines a string attribute for the model. This attribute will be persisted by Redis # as a string. Any value stored here will be retrieved in its string representation. # # @param name [Symbol] Name of the attribute. def self.attribute(name) define_method(name) do read_local(name) end define_method(:"#{name}=") do |value| write_local(name, value) end attributes << name unless attributes.include?(name) end # Defines a counter attribute for the model. This attribute can't be assigned, only incremented # or decremented. It will be zero by default. # # @param name [Symbol] Name of the counter. def self.counter(name) define_method(name) do read_local(name).to_i end counters << name unless counters.include?(name) end # Defines a list attribute for the model. It can be accessed only after the model instance # is created. # # @param name [Symbol] Name of the list. def self.list(name, model = nil) attr_list_reader(name, model) collections << name unless collections.include?(name) end # Defines a set attribute for the model. It can be accessed only after the model instance # is created. Sets are recommended when insertion and retrival order is irrelevant, and # operations like union, join, and membership checks are important. # # @param name [Symbol] Name of the set. def self.set(name, model = nil) attr_set_reader(name, model) collections << name unless collections.include?(name) end # Creates an index (a set) that will be used for finding instances. # # If you want to find a model instance by some attribute value, then an index for that # attribute must exist. # # @example # class User < Ohm::Model # attribute :email # index :email # end # # # Now this is possible: # User.find email: "ohm@example.com" # # @param name [Symbol] Name of the attribute to be indexed. def self.index(att) indices << att unless indices.include?(att) end def self.attr_list_reader(name, model = nil) define_method(name) do instance_variable_get("@#{name}") || instance_variable_set("@#{name}", Attributes::List.new(db, key(name), model)) end end def self.attr_set_reader(name, model) define_method(name) do instance_variable_get("@#{name}") || instance_variable_set("@#{name}", Attributes::Set.new(db, key(name), model)) end end def self.[](id) new(:id => id) if exists?(id) end def self.to_proc Proc.new { |id| self[id] } end def self.all @all ||= Attributes::Index.new(db, key(:all), self) end def self.attributes @@attributes[self] end def self.counters @@counters[self] end def self.collections @@collections[self] end def self.indices @@indices[self] end def self.create(*args) model = new(*args) model.create model end # Search across multiple indices and return the intersection of the sets. # # @example Finds all the user events for the supplied days # event1 = Event.create day: "2009-09-09", author: "Albert" # event2 = Event.create day: "2009-09-09", author: "Benoit" # event3 = Event.create day: "2009-09-10", author: "Albert" # # assert_equal [event1], Event.find(author: "Albert", day: "2009-09-09") def self.find(hash) all.find(hash) end def self.encode(value) Base64.encode64(value.to_s).gsub("\n", "") end def initialize(attrs = {}) @_attributes = Hash.new { |hash, key| hash[key] = read_remote(key) } update_attributes(attrs) end def new? !@id end def create return unless valid? initialize_id mutex do create_model_membership write add_to_indices end end def save return create if new? return unless valid? mutex do write update_indices end end def update(attrs) update_attributes(attrs) save end def update_attributes(attrs) attrs.each do |key, value| send(:"#{key}=", value) end end def delete delete_from_indices delete_attributes(attributes) delete_attributes(counters) delete_attributes(collections) delete_model_membership self end # Increment the counter denoted by :att. # # @param att [Symbol] Attribute to increment. def incr(att) raise ArgumentError, "#{att.inspect} is not a counter." unless counters.include?(att) write_local(att, db.incr(key(att))) end # Decrement the counter denoted by :att. # # @param att [Symbol] Attribute to decrement. def decr(att) raise ArgumentError, "#{att.inspect} is not a counter." unless counters.include?(att) write_local(att, db.decr(key(att))) end def attributes self.class.attributes end def counters self.class.counters end def collections self.class.collections end def indices self.class.indices end def ==(other) other.kind_of?(self.class) && other.key == key rescue MissingID false end # Lock the object before executing the block, and release it once the block is done. def mutex lock! yield unlock! self end def inspect everything = (attributes + collections + counters).map do |att| value = begin send(att) rescue MissingID nil end [att, value.inspect] end "#<#{self.class}:#{new? ? "?" : id} #{everything.map {|e| e.join("=") }.join(" ")}>" end protected def key(*args) self.class.key(id, *args) end # Use MSET if possible, SET otherwise. def write db.support_mset? ? write_with_mset : write_with_set end # Write attributes using SET # This method will be removed once MSET becomes standard. def write_with_set attributes.each do |att| (value = send(att)) ? db.set(key(att), value) : db.del(key(att)) end end # Write attributes using MSET # This is the preferred method, and will be the only option # available once MSET becomes standard. def write_with_mset unless attributes.empty? rems, adds = attributes.map { |a| [key(a), send(a)] }.partition { |t| t.last.nil? } db.del(*rems.flatten.compact) unless rems.empty? db.mset(adds.flatten) unless adds.empty? end end private def self.db Ohm.redis end def self.key(*args) Ohm.key(*args.unshift(self)) end def self.exists?(id) db.sismember(key(:all), id) end def initialize_id self.id = db.incr(self.class.key("id")).to_s end def db Ohm.redis end def delete_attributes(atts) atts.each do |att| db.del(key(att)) end end def create_model_membership db.sadd(self.class.key(:all), id) end def delete_model_membership db.srem(self.class.key(:all), id) end def update_indices delete_from_indices add_to_indices end def add_to_indices indices.each do |att| next add_to_index(att) unless collection?(send(att)) send(att).each { |value| add_to_index(att, value) } end end def collection?(value) self.class.collection?(value) end def self.collection?(value) value.kind_of?(Enumerable) && value.kind_of?(String) == false end def add_to_index(att, value = send(att)) index = index_key_for(att, value) db.sadd(index, id) db.sadd(key(:_indices), index) end def delete_from_indices db.smembers(key(:_indices)).each do |index| db.srem(index, id) end end def read_local(att) @_attributes[att] end def write_local(att, value) @_attributes[att] = value end def read_remote(att) db.get(key(att)) unless new? end def read_locals(attrs) attrs.map do |att| send(att) end end def read_remotes(attrs) attrs.map do |att| read_remote(att) end end def self.index_key_for(att, value) raise IndexNotFound, att unless indices.include?(att) key(att, encode(value)) end def index_key_for(att, value) self.class.index_key_for(att, value) end # Lock the object so no other instances can modify it. # This method implements the design pattern for locks # described at: http://code.google.com/p/redis/wiki/SetnxCommand # # @see Model#mutex def lock! until db.setnx(key(:_lock), lock_timeout) next unless lock = db.get(key(:_lock)) sleep(0.5) and next unless lock_expired?(lock) break unless lock = db.getset(key(:_lock), lock_timeout) break if lock_expired?(lock) end end # Release the lock. # @see Model#mutex def unlock! db.del(key(:_lock)) end def lock_timeout Time.now.to_f + 1 end def lock_expired? lock lock.to_f < Time.now.to_f end end end