require "monitor" class Redis class ProtocolError < RuntimeError def initialize(reply_type) super(<<-EOS.gsub(/(?:^|\n)\s*/, " ")) Got '#{reply_type}' as initial reply byte. If you're running in a multi-threaded environment, make sure you pass the :thread_safe option when initializing the connection. If you're in a forking environment, such as Unicorn, you need to connect to Redis after forking. EOS end end module DisableThreadSafety def synchronize yield end end def self.deprecate(message, trace = caller[0]) $stderr.puts "\n#{message} (in #{trace})" end attr :client def self.connect(options = {}) options = options.dup require "uri" url = URI(options.delete(:url) || ENV["REDIS_URL"] || "redis://127.0.0.1:6379/0") options[:host] ||= url.host options[:port] ||= url.port options[:password] ||= url.password options[:db] ||= url.path[1..-1].to_i new(options) end def self.current Thread.current[:redis] ||= Redis.connect end def self.current=(redis) Thread.current[:redis] = redis end include MonitorMixin def initialize(options = {}) @client = Client.new(options) if options[:thread_safe] == false # Override #synchronize extend DisableThreadSafety else # Monitor#initialize super() end end # Run code without the client reconnecting def without_reconnect(&block) synchronize do @client.without_reconnect(&block) end end # Authenticate to the server. def auth(password) synchronize do @client.call(:auth, password) end end # Change the selected database for the current connection. def select(db) synchronize do @client.db = db @client.call(:select, db) end end # Get information and statistics about the server. def info synchronize do reply = @client.call(:info) if reply.kind_of?(String) Hash[*reply.split(/:|\r\n/).grep(/^[^#]/)] else reply end end end def config(action, *args) synchronize do reply = @client.call(:config, action, *args) if reply.kind_of?(Array) && action == :get Hash[*reply] else reply end end end # Remove all keys from the current database. def flushdb synchronize do @client.call(:flushdb) end end # Remove all keys from all databases. def flushall synchronize do @client.call(:flushall) end end # Synchronously save the dataset to disk. def save synchronize do @client.call(:save) end end # Asynchronously save the dataset to disk. def bgsave synchronize do @client.call(:bgsave) end end # Asynchronously rewrite the append-only file. def bgrewriteaof synchronize do @client.call(:bgrewriteaof) end end # Get the value of a key. def get(key) synchronize do @client.call(:get, key) end end # Returns the bit value at offset in the string value stored at key. def getbit(key, offset) synchronize do @client.call(:getbit, key, offset) end end # Get a substring of the string stored at a key. def getrange(key, start, stop) synchronize do @client.call(:getrange, key, start, stop) end end # Set the string value of a key and return its old value. def getset(key, value) synchronize do @client.call(:getset, key, value) end end # Get the values of all the given keys. def mget(*keys) synchronize do @client.call(:mget, *keys) end end # Append a value to a key. def append(key, value) synchronize do @client.call(:append, key, value) end end def substr(key, start, stop) synchronize do @client.call(:substr, key, start, stop) end end # Get the length of the value stored in a key. def strlen(key) synchronize do @client.call(:strlen, key) end end # Get all the fields and values in a hash. def hgetall(key) synchronize do reply = @client.call(:hgetall, key) if reply.kind_of?(Array) Hash[*reply] else reply end end end # Get the value of a hash field. def hget(key, field) synchronize do @client.call(:hget, key, field) end end # Delete a hash field. def hdel(key, field) synchronize do @client.call(:hdel, key, field) end end # Get all the fields in a hash. def hkeys(key) synchronize do @client.call(:hkeys, key) end end # Find all keys matching the given pattern. def keys(pattern = "*") synchronize do reply = @client.call(:keys, pattern) if reply.kind_of?(String) reply.split(" ") else reply end end end # Return a random key from the keyspace. def randomkey synchronize do @client.call(:randomkey) end end # Echo the given string. def echo(value) synchronize do @client.call(:echo, value) end end # Ping the server. def ping synchronize do @client.call(:ping) end end # Get the UNIX time stamp of the last successful save to disk. def lastsave synchronize do @client.call(:lastsave) end end # Return the number of keys in the selected database. def dbsize synchronize do @client.call(:dbsize) end end # Determine if a key exists. def exists(key) synchronize do _bool @client.call(:exists, key) end end # Get the length of a list. def llen(key) synchronize do @client.call(:llen, key) end end # Get a range of elements from a list. def lrange(key, start, stop) synchronize do @client.call(:lrange, key, start, stop) end end # Trim a list to the specified range. def ltrim(key, start, stop) synchronize do @client.call(:ltrim, key, start, stop) end end # Get an element from a list by its index. def lindex(key, index) synchronize do @client.call(:lindex, key, index) end end # Insert an element before or after another element in a list. def linsert(key, where, pivot, value) synchronize do @client.call(:linsert, key, where, pivot, value) end end # Set the value of an element in a list by its index. def lset(key, index, value) synchronize do @client.call(:lset, key, index, value) end end # Remove elements from a list. def lrem(key, count, value) synchronize do @client.call(:lrem, key, count, value) end end # Append a value to a list. def rpush(key, value) synchronize do @client.call(:rpush, key, value) end end # Append a value to a list, only if the list exists. def rpushx(key, value) synchronize do @client.call(:rpushx, key, value) end end # Prepend a value to a list. def lpush(key, value) synchronize do @client.call(:lpush, key, value) end end # Prepend a value to a list, only if the list exists. def lpushx(key, value) synchronize do @client.call(:lpushx, key, value) end end # Remove and get the last element in a list. def rpop(key) synchronize do @client.call(:rpop, key) end end # Remove and get the first element in a list, or block until one is available. def blpop(*args) synchronize do @client.call_without_timeout(:blpop, *args) end end # Remove and get the last element in a list, or block until one is available. def brpop(*args) synchronize do @client.call_without_timeout(:brpop, *args) end end # Pop a value from a list, push it to another list and return it; or block # until one is available. def brpoplpush(source, destination, timeout) synchronize do @client.call_without_timeout(:brpoplpush, source, destination, timeout) end end # Remove the last element in a list, append it to another list and return it. def rpoplpush(source, destination) synchronize do @client.call(:rpoplpush, source, destination) end end # Remove and get the first element in a list. def lpop(key) synchronize do @client.call(:lpop, key) end end # Get all the members in a set. def smembers(key) synchronize do @client.call(:smembers, key) end end # Determine if a given value is a member of a set. def sismember(key, member) synchronize do _bool @client.call(:sismember, key, member) end end # Add a member to a set. def sadd(key, value) synchronize do _bool @client.call(:sadd, key, value) end end # Remove a member from a set. def srem(key, value) synchronize do _bool @client.call(:srem, key, value) end end # Move a member from one set to another. def smove(source, destination, member) synchronize do _bool @client.call(:smove, source, destination, member) end end # Remove and return a random member from a set. def spop(key) synchronize do @client.call(:spop, key) end end # Get the number of members in a set. def scard(key) synchronize do @client.call(:scard, key) end end # Intersect multiple sets. def sinter(*keys) synchronize do @client.call(:sinter, *keys) end end # Intersect multiple sets and store the resulting set in a key. def sinterstore(destination, *keys) synchronize do @client.call(:sinterstore, destination, *keys) end end # Add multiple sets. def sunion(*keys) synchronize do @client.call(:sunion, *keys) end end # Add multiple sets and store the resulting set in a key. def sunionstore(destination, *keys) synchronize do @client.call(:sunionstore, destination, *keys) end end # Subtract multiple sets. def sdiff(*keys) synchronize do @client.call(:sdiff, *keys) end end # Subtract multiple sets and store the resulting set in a key. def sdiffstore(destination, *keys) synchronize do @client.call(:sdiffstore, destination, *keys) end end # Get a random member from a set. def srandmember(key) synchronize do @client.call(:srandmember, key) end end # Add a member to a sorted set, or update its score if it already exists. def zadd(key, score, member) synchronize do _bool @client.call(:zadd, key, score, member) end end # Determine the index of a member in a sorted set. def zrank(key, member) synchronize do @client.call(:zrank, key, member) end end # Determine the index of a member in a sorted set, with scores ordered from # high to low. def zrevrank(key, member) synchronize do @client.call(:zrevrank, key, member) end end # Increment the score of a member in a sorted set. def zincrby(key, increment, member) synchronize do @client.call(:zincrby, key, increment, member) end end # Get the number of members in a sorted set. def zcard(key) synchronize do @client.call(:zcard, key) end end # Return a range of members in a sorted set, by index. def zrange(key, start, stop, options = {}) command = CommandOptions.new(options) do |c| c.bool :withscores c.bool :with_scores end synchronize do @client.call(:zrange, key, start, stop, *command.to_a) end end # Return a range of members in a sorted set, by score. def zrangebyscore(key, min, max, options = {}) command = CommandOptions.new(options) do |c| c.splat :limit c.bool :withscores c.bool :with_scores end synchronize do @client.call(:zrangebyscore, key, min, max, *command.to_a) end end # Count the members in a sorted set with scores within the given values. def zcount(key, start, stop) synchronize do @client.call(:zcount, key, start, stop) end end # Return a range of members in a sorted set, by index, with scores ordered # from high to low. def zrevrange(key, start, stop, options = {}) command = CommandOptions.new(options) do |c| c.bool :withscores c.bool :with_scores end synchronize do @client.call(:zrevrange, key, start, stop, *command.to_a) end end # Return a range of members in a sorted set, by score, with scores ordered # from high to low. def zrevrangebyscore(key, max, min, options = {}) command = CommandOptions.new(options) do |c| c.splat :limit c.bool :withscores c.bool :with_scores end synchronize do @client.call(:zrevrangebyscore, key, max, min, *command.to_a) end end # Remove all members in a sorted set within the given scores. def zremrangebyscore(key, min, max) synchronize do @client.call(:zremrangebyscore, key, min, max) end end # Remove all members in a sorted set within the given indexes. def zremrangebyrank(key, start, stop) synchronize do @client.call(:zremrangebyrank, key, start, stop) end end # Get the score associated with the given member in a sorted set. def zscore(key, member) synchronize do @client.call(:zscore, key, member) end end # Remove a member from a sorted set. def zrem(key, member) synchronize do _bool @client.call(:zrem, key, member) end end # Intersect multiple sorted sets and store the resulting sorted set in a new # key. def zinterstore(destination, keys, options = {}) command = CommandOptions.new(options) do |c| c.splat :weights c.value :aggregate end synchronize do @client.call(:zinterstore, destination, keys.size, *(keys + command.to_a)) end end # Add multiple sorted sets and store the resulting sorted set in a new key. def zunionstore(destination, keys, options = {}) command = CommandOptions.new(options) do |c| c.splat :weights c.value :aggregate end synchronize do @client.call(:zunionstore, destination, keys.size, *(keys + command.to_a)) end end # Move a key to another database. def move(key, db) synchronize do _bool @client.call(:move, key, db) end end # Set the value of a key, only if the key does not exist. def setnx(key, value) synchronize do _bool @client.call(:setnx, key, value) end end # Delete a key. def del(*keys) synchronize do @client.call(:del, *keys) end end # Rename a key. def rename(old_name, new_name) synchronize do @client.call(:rename, old_name, new_name) end end # Rename a key, only if the new key does not exist. def renamenx(old_name, new_name) synchronize do _bool @client.call(:renamenx, old_name, new_name) end end # Set a key's time to live in seconds. def expire(key, seconds) synchronize do _bool @client.call(:expire, key, seconds) end end # Remove the expiration from a key. def persist(key) synchronize do _bool @client.call(:persist, key) end end # Get the time to live for a key. def ttl(key) synchronize do @client.call(:ttl, key) end end # Set the expiration for a key as a UNIX timestamp. def expireat(key, unix_time) synchronize do _bool @client.call(:expireat, key, unix_time) end end # Set the string value of a hash field. def hset(key, field, value) synchronize do _bool @client.call(:hset, key, field, value) end end # Set the value of a hash field, only if the field does not exist. def hsetnx(key, field, value) synchronize do _bool @client.call(:hsetnx, key, field, value) end end # Set multiple hash fields to multiple values. def hmset(key, *attrs) synchronize do @client.call(:hmset, key, *attrs) end end def mapped_hmset(key, hash) hmset(key, *hash.to_a.flatten) end # Get the values of all the given hash fields. def hmget(key, *fields) synchronize do @client.call(:hmget, key, *fields) end end def mapped_hmget(key, *fields) reply = hmget(key, *fields) if reply.kind_of?(Array) Hash[*fields.zip(reply).flatten] else reply end end # Get the number of fields in a hash. def hlen(key) synchronize do @client.call(:hlen, key) end end # Get all the values in a hash. def hvals(key) synchronize do @client.call(:hvals, key) end end # Increment the integer value of a hash field by the given number. def hincrby(key, field, increment) synchronize do @client.call(:hincrby, key, field, increment) end end # Discard all commands issued after MULTI. def discard synchronize do @client.call(:discard) end end # Determine if a hash field exists. def hexists(key, field) synchronize do _bool @client.call(:hexists, key, field) end end # Listen for all requests received by the server in real time. def monitor(&block) synchronize do @client.call_loop(:monitor, &block) end end def debug(*args) synchronize do @client.call(:debug, *args) end end # Internal command used for replication. def sync synchronize do @client.call(:sync) end end def [](key) get(key) end def []=(key,value) set(key, value) end # Set the string value of a key. def set(key, value) synchronize do @client.call(:set, key, value) end end # Sets or clears the bit at offset in the string value stored at key. def setbit(key, offset, value) synchronize do @client.call(:setbit, key, offset, value) end end # Set the value and expiration of a key. def setex(key, ttl, value) synchronize do @client.call(:setex, key, ttl, value) end end # Overwrite part of a string at key starting at the specified offset. def setrange(key, offset, value) synchronize do @client.call(:setrange, key, offset, value) end end # Set multiple keys to multiple values. def mset(*args) synchronize do @client.call(:mset, *args) end end def mapped_mset(hash) mset(*hash.to_a.flatten) end # Set multiple keys to multiple values, only if none of the keys exist. def msetnx(*args) synchronize do @client.call(:msetnx, *args) end end def mapped_msetnx(hash) msetnx(*hash.to_a.flatten) end def mapped_mget(*keys) reply = mget(*keys) if reply.kind_of?(Array) Hash[*keys.zip(reply).flatten] else reply end end # Sort the elements in a list, set or sorted set. def sort(key, options = {}) command = CommandOptions.new(options) do |c| c.value :by c.splat :limit c.multi :get c.words :order c.value :store end synchronize do @client.call(:sort, key, *command.to_a) end end # Increment the integer value of a key by one. def incr(key) synchronize do @client.call(:incr, key) end end # Increment the integer value of a key by the given number. def incrby(key, increment) synchronize do @client.call(:incrby, key, increment) end end # Decrement the integer value of a key by one. def decr(key) synchronize do @client.call(:decr, key) end end # Decrement the integer value of a key by the given number. def decrby(key, decrement) synchronize do @client.call(:decrby, key, decrement) end end # Determine the type stored at key. def type(key) synchronize do @client.call(:type, key) end end # Close the connection. def quit synchronize do begin @client.call(:quit) rescue Errno::ECONNRESET ensure @client.disconnect end end end # Synchronously save the dataset to disk and then shut down the server. def shutdown synchronize do @client.call(:shutdown) end end # Make the server a slave of another instance, or promote it as master. def slaveof(host, port) synchronize do @client.call(:slaveof, host, port) end end def pipelined(options = {}) synchronize do begin original, @client = @client, Pipeline.new yield original.call_pipelined(@client.commands, options) unless @client.commands.empty? ensure @client = original end end end # Watch the given keys to determine execution of the MULTI/EXEC block. def watch(*keys) synchronize do @client.call(:watch, *keys) end end # Forget about all watched keys. def unwatch synchronize do @client.call(:unwatch) end end # Execute all commands issued after MULTI. def exec synchronize do @client.call(:exec) end end # Mark the start of a transaction block. def multi synchronize do if !block_given? @client.call :multi else result = pipelined(:raise => false) do multi yield(self) exec end result.last end end end # Post a message to a channel. def publish(channel, message) synchronize do @client.call(:publish, channel, message) end end def subscribed? synchronize do @client.kind_of? SubscribedClient end end # Stop listening for messages posted to the given channels. def unsubscribe(*channels) synchronize do raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed? @client.unsubscribe(*channels) end end # Stop listening for messages posted to channels matching the given patterns. def punsubscribe(*channels) synchronize do raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed? @client.punsubscribe(*channels) end end # Listen for messages published to the given channels. def subscribe(*channels, &block) synchronize do subscription(:subscribe, channels, block) end end # Listen for messages published to channels matching the given patterns. def psubscribe(*channels, &block) synchronize do subscription(:psubscribe, channels, block) end end def id synchronize do @client.id end end def inspect synchronize do "#" end end def method_missing(command, *args) synchronize do @client.call(command, *args) end end class CommandOptions def initialize(options) @result = [] @options = options yield(self) end def bool(name) insert(name) { |argument, value| [argument] } end def value(name) insert(name) { |argument, value| [argument, value] } end def splat(name) insert(name) { |argument, value| [argument, *value] } end def multi(name) insert(name) { |argument, value| [argument].product(Array(value)).flatten } end def words(name) insert(name) { |argument, value| value.split(" ") } end def to_a @result end def insert(name) @result += yield(name.to_s.upcase.gsub("_", ""), @options[name]) if @options[name] end end private def _bool(value) value == 1 end def subscription(method, channels, block) return @client.call(method, *channels) if subscribed? begin original, @client = @client, SubscribedClient.new(@client) @client.send(method, *channels, &block) ensure @client = original end end end require "redis/version" require "redis/connection" require "redis/client" require "redis/pipeline" require "redis/subscribe" require "redis/compat"