lib/redis.rb in redis-3.2.2 vs lib/redis.rb in redis-3.3.0

- old
+ new

@@ -47,10 +47,11 @@ # # @return [Redis] a new client instance def initialize(options = {}) @options = options.dup @original_client = @client = Client.new(options) + @queue = Hash.new { |h, k| h[k] = [] } super() # Monitor#initialize end def synchronize @@ -73,14 +74,53 @@ def connected? @original_client.connected? end # Disconnect the client as quickly and silently as possible. - def disconnect! + def close @original_client.disconnect end + alias disconnect! close + # Sends a command to Redis and returns its reply. + # + # Replies are converted to Ruby objects according to the RESP protocol, so + # you can expect a Ruby array, integer or nil when Redis sends one. Higher + # level transformations, such as converting an array of pairs into a Ruby + # hash, are up to consumers. + # + # Redis error replies are raised as Ruby exceptions. + def call(*command) + synchronize do |client| + client.call(command) + end + end + + # Queues a command for pipelining. + # + # Commands in the queue are executed with the Redis#commit method. + # + # See http://redis.io/topics/pipelining for more details. + # + def queue(*command) + @queue[Thread.current.object_id] << command + end + + # Sends all commands in the queue. + # + # See http://redis.io/topics/pipelining for more details. + # + def commit + synchronize do |client| + begin + client.call_pipelined(@queue[Thread.current.object_id]) + ensure + @queue.delete(Thread.current.object_id) + end + end + end + # Authenticate to the server. # # @param [String] password must match the password specified in the # `requirepass` directive in the configuration file # @return [String] `OK` @@ -159,11 +199,11 @@ # property with `CONFIG GET` def config(action, *args) synchronize do |client| client.call([:config, action] + args) do |reply| if reply.kind_of?(Array) && action == :get - Hash[_pairify(reply)] + Hashify.call(reply) else reply end end end @@ -318,33 +358,33 @@ # # @param [String] key # @return [Boolean] whether the timeout was removed or not def persist(key) synchronize do |client| - client.call([:persist, key], &_boolify) + client.call([:persist, key], &Boolify) end end # Set a key's time to live in seconds. # # @param [String] key # @param [Fixnum] seconds time to live # @return [Boolean] whether the timeout was set or not def expire(key, seconds) synchronize do |client| - client.call([:expire, key, seconds], &_boolify) + client.call([:expire, key, seconds], &Boolify) end end # Set the expiration for a key as a UNIX timestamp. # # @param [String] key # @param [Fixnum] unix_time expiry time specified as a UNIX timestamp # @return [Boolean] whether the timeout was set or not def expireat(key, unix_time) synchronize do |client| - client.call([:expireat, key, unix_time], &_boolify) + client.call([:expireat, key, unix_time], &Boolify) end end # Get the time to live (in seconds) for a key. # @@ -369,22 +409,22 @@ # @param [String] key # @param [Fixnum] milliseconds time to live # @return [Boolean] whether the timeout was set or not def pexpire(key, milliseconds) synchronize do |client| - client.call([:pexpire, key, milliseconds], &_boolify) + client.call([:pexpire, key, milliseconds], &Boolify) end end # Set the expiration for a key as number of milliseconds from UNIX Epoch. # # @param [String] key # @param [Fixnum] ms_unix_time expiry time specified as number of milliseconds from UNIX Epoch. # @return [Boolean] whether the timeout was set or not def pexpireat(key, ms_unix_time) synchronize do |client| - client.call([:pexpireat, key, ms_unix_time], &_boolify) + client.call([:pexpireat, key, ms_unix_time], &Boolify) end end # Get the time to live (in milliseconds) for a key. # @@ -459,11 +499,11 @@ # # @param [String] key # @return [Boolean] def exists(key) synchronize do |client| - client.call([:exists, key], &_boolify) + client.call([:exists, key], &Boolify) end end # Find all keys matching the given pattern. # @@ -500,11 +540,11 @@ # @param [String] key # @param [Fixnum] db # @return [Boolean] whether the key was moved or not def move(key, db) synchronize do |client| - client.call([:move, key, db], &_boolify) + client.call([:move, key, db], &Boolify) end end def object(*args) synchronize do |client| @@ -537,11 +577,11 @@ # @param [String] old_name # @param [String] new_name # @return [Boolean] whether the key was renamed or not def renamenx(old_name, new_name) synchronize do |client| - client.call([:renamenx, old_name, new_name], &_boolify) + client.call([:renamenx, old_name, new_name], &Boolify) end end # Sort the elements in a list, set or sorted set. # @@ -676,11 +716,11 @@ # @param [String] key # @param [Float] increment # @return [Float] value after incrementing it def incrbyfloat(key, increment) synchronize do |client| - client.call([:incrbyfloat, key, increment], &_floatify) + client.call([:incrbyfloat, key, increment], &Floatify) end end # Set the string value of a key. # @@ -707,11 +747,11 @@ xx = options[:xx] args.concat(["XX"]) if xx synchronize do |client| if nx || xx - client.call([:set, key, value.to_s] + args, &_boolify_set) + client.call([:set, key, value.to_s] + args, &BoolifySet) else client.call([:set, key, value.to_s] + args) end end end @@ -747,11 +787,11 @@ # @param [String] key # @param [String] value # @return [Boolean] whether the key was set or not def setnx(key, value) synchronize do |client| - client.call([:setnx, key, value.to_s], &_boolify) + client.call([:setnx, key, value.to_s], &Boolify) end end # Set one or more values. # @@ -793,11 +833,11 @@ # @return [Boolean] whether or not all values were set # # @see #mapped_msetnx def msetnx(*args) synchronize do |client| - client.call([:msetnx] + args, &_boolify) + client.call([:msetnx] + args, &Boolify) end end # Set one or more values, only if none of the keys exist. # @@ -842,11 +882,11 @@ end # Get the values of all the given keys. # # @example - # redis.mapped_mget("key1", "key1") + # redis.mapped_mget("key1", "key2") # # => { "key1" => "v1", "key2" => "v2" } # # @param [Array<String>] keys array of keys # @return [Hash] a hash mapping the specified keys to their values # @@ -1266,11 +1306,11 @@ if member.is_a? Array # Variadic: return integer reply else # Single argument: return boolean - _boolify.call(reply) + Boolify.call(reply) end end end end @@ -1288,11 +1328,11 @@ if member.is_a? Array # Variadic: return integer reply else # Single argument: return boolean - _boolify.call(reply) + Boolify.call(reply) end end end end @@ -1327,22 +1367,22 @@ # @param [String] destination destination key # @param [String] member member to move from `source` to `destination` # @return [Boolean] def smove(source, destination, member) synchronize do |client| - client.call([:smove, source, destination, member], &_boolify) + client.call([:smove, source, destination, member], &Boolify) end end # Determine if a given value is a member of a set. # # @param [String] key # @param [String] member # @return [Boolean] def sismember(key, member) synchronize do |client| - client.call([:sismember, key, member], &_boolify) + client.call([:sismember, key, member], &Boolify) end end # Get all the members in a set. # @@ -1481,14 +1521,14 @@ end synchronize do |client| if args.size == 1 && args[0].is_a?(Array) # Variadic: return float if INCR, integer if !INCR - client.call([:zadd, key] + zadd_options + args[0], &(incr ? _floatify : _identity)) + client.call([:zadd, key] + zadd_options + args[0], &(incr ? Floatify : nil)) elsif args.size == 2 # Single pair: return float if INCR, boolean if !INCR - client.call([:zadd, key] + zadd_options + args, &(incr ? _floatify : _boolify)) + client.call([:zadd, key] + zadd_options + args, &(incr ? Floatify : Boolify)) else raise ArgumentError, "wrong number of arguments" end end end @@ -1503,11 +1543,11 @@ # @param [Float] increment # @param [String] member # @return [Float] score of the member after incrementing it def zincrby(key, increment, member) synchronize do |client| - client.call([:zincrby, key, increment, member], &_floatify) + client.call([:zincrby, key, increment, member], &Floatify) end end # Remove one or more members from a sorted set. # @@ -1532,11 +1572,11 @@ if member.is_a? Array # Variadic: return integer reply else # Single argument: return boolean - _boolify.call(reply) + Boolify.call(reply) end end end end @@ -1549,11 +1589,11 @@ # @param [String] key # @param [String] member # @return [Float] score of the member def zscore(key, member) synchronize do |client| - client.call([:zscore, key, member], &_floatify) + client.call([:zscore, key, member], &Floatify) end end # Return a range of members in a sorted set, by index. # @@ -1578,11 +1618,11 @@ with_scores = options[:with_scores] || options[:withscores] if with_scores args << "WITHSCORES" - block = _floatify_pairs + block = FloatifyPairs end synchronize do |client| client.call([:zrange, key, start, stop] + args, &block) end @@ -1604,11 +1644,11 @@ with_scores = options[:with_scores] || options[:withscores] if with_scores args << "WITHSCORES" - block = _floatify_pairs + block = FloatifyPairs end synchronize do |client| client.call([:zrevrange, key, start, stop] + args, &block) end @@ -1742,11 +1782,11 @@ with_scores = options[:with_scores] || options[:withscores] if with_scores args << "WITHSCORES" - block = _floatify_pairs + block = FloatifyPairs end limit = options[:limit] args.concat(["LIMIT"] + limit) if limit @@ -1774,11 +1814,11 @@ with_scores = options[:with_scores] || options[:withscores] if with_scores args << ["WITHSCORES"] - block = _floatify_pairs + block = FloatifyPairs end limit = options[:limit] args.concat(["LIMIT"] + limit) if limit @@ -1904,11 +1944,11 @@ # @param [String] field # @param [String] value # @return [Boolean] whether or not the field was **added** to the hash def hset(key, field, value) synchronize do |client| - client.call([:hset, key, field, value], &_boolify) + client.call([:hset, key, field, value], &Boolify) end end # Set the value of a hash field, only if the field does not exist. # @@ -1916,11 +1956,11 @@ # @param [String] field # @param [String] value # @return [Boolean] whether or not the field was **added** to the hash def hsetnx(key, field, value) synchronize do |client| - client.call([:hsetnx, key, field, value], &_boolify) + client.call([:hsetnx, key, field, value], &Boolify) end end # Set one or more hash values. # @@ -2019,11 +2059,11 @@ # @param [String] key # @param [String] field # @return [Boolean] whether or not the field exists in the hash def hexists(key, field) synchronize do |client| - client.call([:hexists, key, field], &_boolify) + client.call([:hexists, key, field], &Boolify) end end # Increment the integer value of a hash field by the given integer number. # @@ -2043,11 +2083,11 @@ # @param [String] field # @param [Float] increment # @return [Float] value of the field after incrementing it def hincrbyfloat(key, field, increment) synchronize do |client| - client.call([:hincrbyfloat, key, field, increment], &_floatify) + client.call([:hincrbyfloat, key, field, increment], &Floatify) end end # Get all the fields in a hash. # @@ -2073,11 +2113,11 @@ # # @param [String] key # @return [Hash<String, String>] def hgetall(key) synchronize do |client| - client.call([:hgetall, key], &_hashify) + client.call([:hgetall, key], &Hashify) end end # Post a message to a channel. def publish(channel, message) @@ -2093,14 +2133,21 @@ end # Listen for messages published to the given channels. def subscribe(*channels, &block) synchronize do |client| - _subscription(:subscribe, channels, block) + _subscription(:subscribe, 0, channels, block) end end + # Listen for messages published to the given channels. Throw a timeout error if there is no messages for a timeout period. + def subscribe_with_timeout(timeout, *channels, &block) + synchronize do |client| + _subscription(:subscribe_with_timeout, timeout, channels, block) + end + end + # Stop listening for messages posted to the given channels. def unsubscribe(*channels) synchronize do |client| raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed? client.unsubscribe(*channels) @@ -2108,14 +2155,21 @@ end # Listen for messages published to channels matching the given patterns. def psubscribe(*channels, &block) synchronize do |client| - _subscription(:psubscribe, channels, block) + _subscription(:psubscribe, 0, channels, block) end end + # Listen for messages published to channels matching the given patterns. Throw a timeout error if there is no messages for a timeout period. + def psubscribe_with_timeout(timeout, *channels, &block) + synchronize do |client| + _subscription(:psubscribe_with_timeout, timeout, channels, block) + end + end + # Stop listening for messages posted to channels matching the given patterns. def punsubscribe(*channels) synchronize do |client| raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed? client.punsubscribe(*channels) @@ -2310,11 +2364,11 @@ if subcommand == "exists" synchronize do |client| arg = args.first client.call([:script, :exists, arg]) do |reply| - reply = reply.map { |r| _boolify.call(r) } + reply = reply.map { |r| Boolify.call(r) } if arg.is_a?(Array) reply else reply.first @@ -2464,11 +2518,11 @@ # - `:count => Integer`: return count keys at most per iteration # # @return [String, Array<[String, String]>] the next cursor and all found keys def hscan(key, cursor, options={}) _scan(:hscan, cursor, [key], options) do |reply| - [reply[0], _pairify(reply[1])] + [reply[0], reply[1].each_slice(2).to_a] end end # Scan a hash # @@ -2503,11 +2557,11 @@ # # @return [String, Array<[String, Float]>] the next cursor and all found # members and scores def zscan(key, cursor, options={}) _scan(:zscan, cursor, [key], options) do |reply| - [reply[0], _floatify_pairs.call(reply[1])] + [reply[0], FloatifyPairs.call(reply[1])] end end # Scan a sorted set # @@ -2571,11 +2625,11 @@ # @param [String] key # @param [String, Array<String>] member one member, or array of members # @return [Boolean] true if at least 1 HyperLogLog internal register was altered. false otherwise. def pfadd(key, member) synchronize do |client| - client.call([:pfadd, key, member], &_boolify) + client.call([:pfadd, key, member], &Boolify) end end # Get the approximate cardinality of members added to HyperLogLog structure. # @@ -2596,11 +2650,11 @@ # @param [String] dest_key destination key # @param [String, Array<String>] source_key source key, or array of keys # @return [Boolean] def pfmerge(dest_key, *source_key) synchronize do |client| - client.call([:pfmerge, dest_key, *source_key], &_boolify_set) + client.call([:pfmerge, dest_key, *source_key], &BoolifySet) end end # Interact with the sentinel command (masters, master, slaves, failover) # @@ -2615,13 +2669,13 @@ when "get-master-addr-by-name" reply else if reply.kind_of?(Array) if reply[0].kind_of?(Array) - reply.map(&_hashify) + reply.map(&Hashify) else - _hashify.call(reply) + Hashify.call(reply) end else reply end end @@ -2650,73 +2704,62 @@ private # Commands returning 1 for true and 0 for false may be executed in a pipeline # where the method call will return nil. Propagate the nil instead of falsely # returning false. - def _boolify + Boolify = lambda { |value| value == 1 if value } - end - def _boolify_set + BoolifySet = lambda { |value| if value && "OK" == value true else false end } - end - def _hashify + Hashify = lambda { |array| hash = Hash.new array.each_slice(2) do |field, value| hash[field] = value end hash } - end - def _floatify + Floatify = lambda { |str| - return unless str - - if (inf = str.match(/^(-)?inf/i)) - (inf[1] ? -1.0 : 1.0) / 0.0 - else - Float(str) + if str + if (inf = str.match(/^(-)?inf/i)) + (inf[1] ? -1.0 : 1.0) / 0.0 + else + Float(str) + end end } - end - def _floatify_pairs + FloatifyPairs = lambda { |array| - return unless array - - array.each_slice(2).map do |member, score| - [member, _floatify.call(score)] + if array + array.each_slice(2).map do |member, score| + [member, Floatify.call(score)] + end end } - end - def _pairify(array) - array.each_slice(2).to_a - end - - def _identity - lambda { |value| - value - } - end - - def _subscription(method, channels, block) + def _subscription(method, timeout, channels, block) return @client.call([method] + channels) if subscribed? begin original, @client = @client, SubscribedClient.new(@client) - @client.send(method, *channels, &block) + if timeout > 0 + @client.send(method, timeout, *channels, &block) + else + @client.send(method, *channels, &block) + end ensure @client = original end end