lib/redis.rb in redis-3.2.2 vs lib/redis.rb in redis-3.3.0
- old
+ new
@@ -47,10 +47,11 @@
#
# @return [Redis] a new client instance
def initialize(options = {})
@options = options.dup
@original_client = @client = Client.new(options)
+ @queue = Hash.new { |h, k| h[k] = [] }
super() # Monitor#initialize
end
def synchronize
@@ -73,14 +74,53 @@
def connected?
@original_client.connected?
end
# Disconnect the client as quickly and silently as possible.
- def disconnect!
+ def close
@original_client.disconnect
end
+ alias disconnect! close
+ # Sends a command to Redis and returns its reply.
+ #
+ # Replies are converted to Ruby objects according to the RESP protocol, so
+ # you can expect a Ruby array, integer or nil when Redis sends one. Higher
+ # level transformations, such as converting an array of pairs into a Ruby
+ # hash, are up to consumers.
+ #
+ # Redis error replies are raised as Ruby exceptions.
+ def call(*command)
+ synchronize do |client|
+ client.call(command)
+ end
+ end
+
+ # Queues a command for pipelining.
+ #
+ # Commands in the queue are executed with the Redis#commit method.
+ #
+ # See http://redis.io/topics/pipelining for more details.
+ #
+ def queue(*command)
+ @queue[Thread.current.object_id] << command
+ end
+
+ # Sends all commands in the queue.
+ #
+ # See http://redis.io/topics/pipelining for more details.
+ #
+ def commit
+ synchronize do |client|
+ begin
+ client.call_pipelined(@queue[Thread.current.object_id])
+ ensure
+ @queue.delete(Thread.current.object_id)
+ end
+ end
+ end
+
# Authenticate to the server.
#
# @param [String] password must match the password specified in the
# `requirepass` directive in the configuration file
# @return [String] `OK`
@@ -159,11 +199,11 @@
# property with `CONFIG GET`
def config(action, *args)
synchronize do |client|
client.call([:config, action] + args) do |reply|
if reply.kind_of?(Array) && action == :get
- Hash[_pairify(reply)]
+ Hashify.call(reply)
else
reply
end
end
end
@@ -318,33 +358,33 @@
#
# @param [String] key
# @return [Boolean] whether the timeout was removed or not
def persist(key)
synchronize do |client|
- client.call([:persist, key], &_boolify)
+ client.call([:persist, key], &Boolify)
end
end
# Set a key's time to live in seconds.
#
# @param [String] key
# @param [Fixnum] seconds time to live
# @return [Boolean] whether the timeout was set or not
def expire(key, seconds)
synchronize do |client|
- client.call([:expire, key, seconds], &_boolify)
+ client.call([:expire, key, seconds], &Boolify)
end
end
# Set the expiration for a key as a UNIX timestamp.
#
# @param [String] key
# @param [Fixnum] unix_time expiry time specified as a UNIX timestamp
# @return [Boolean] whether the timeout was set or not
def expireat(key, unix_time)
synchronize do |client|
- client.call([:expireat, key, unix_time], &_boolify)
+ client.call([:expireat, key, unix_time], &Boolify)
end
end
# Get the time to live (in seconds) for a key.
#
@@ -369,22 +409,22 @@
# @param [String] key
# @param [Fixnum] milliseconds time to live
# @return [Boolean] whether the timeout was set or not
def pexpire(key, milliseconds)
synchronize do |client|
- client.call([:pexpire, key, milliseconds], &_boolify)
+ client.call([:pexpire, key, milliseconds], &Boolify)
end
end
# Set the expiration for a key as number of milliseconds from UNIX Epoch.
#
# @param [String] key
# @param [Fixnum] ms_unix_time expiry time specified as number of milliseconds from UNIX Epoch.
# @return [Boolean] whether the timeout was set or not
def pexpireat(key, ms_unix_time)
synchronize do |client|
- client.call([:pexpireat, key, ms_unix_time], &_boolify)
+ client.call([:pexpireat, key, ms_unix_time], &Boolify)
end
end
# Get the time to live (in milliseconds) for a key.
#
@@ -459,11 +499,11 @@
#
# @param [String] key
# @return [Boolean]
def exists(key)
synchronize do |client|
- client.call([:exists, key], &_boolify)
+ client.call([:exists, key], &Boolify)
end
end
# Find all keys matching the given pattern.
#
@@ -500,11 +540,11 @@
# @param [String] key
# @param [Fixnum] db
# @return [Boolean] whether the key was moved or not
def move(key, db)
synchronize do |client|
- client.call([:move, key, db], &_boolify)
+ client.call([:move, key, db], &Boolify)
end
end
def object(*args)
synchronize do |client|
@@ -537,11 +577,11 @@
# @param [String] old_name
# @param [String] new_name
# @return [Boolean] whether the key was renamed or not
def renamenx(old_name, new_name)
synchronize do |client|
- client.call([:renamenx, old_name, new_name], &_boolify)
+ client.call([:renamenx, old_name, new_name], &Boolify)
end
end
# Sort the elements in a list, set or sorted set.
#
@@ -676,11 +716,11 @@
# @param [String] key
# @param [Float] increment
# @return [Float] value after incrementing it
def incrbyfloat(key, increment)
synchronize do |client|
- client.call([:incrbyfloat, key, increment], &_floatify)
+ client.call([:incrbyfloat, key, increment], &Floatify)
end
end
# Set the string value of a key.
#
@@ -707,11 +747,11 @@
xx = options[:xx]
args.concat(["XX"]) if xx
synchronize do |client|
if nx || xx
- client.call([:set, key, value.to_s] + args, &_boolify_set)
+ client.call([:set, key, value.to_s] + args, &BoolifySet)
else
client.call([:set, key, value.to_s] + args)
end
end
end
@@ -747,11 +787,11 @@
# @param [String] key
# @param [String] value
# @return [Boolean] whether the key was set or not
def setnx(key, value)
synchronize do |client|
- client.call([:setnx, key, value.to_s], &_boolify)
+ client.call([:setnx, key, value.to_s], &Boolify)
end
end
# Set one or more values.
#
@@ -793,11 +833,11 @@
# @return [Boolean] whether or not all values were set
#
# @see #mapped_msetnx
def msetnx(*args)
synchronize do |client|
- client.call([:msetnx] + args, &_boolify)
+ client.call([:msetnx] + args, &Boolify)
end
end
# Set one or more values, only if none of the keys exist.
#
@@ -842,11 +882,11 @@
end
# Get the values of all the given keys.
#
# @example
- # redis.mapped_mget("key1", "key1")
+ # redis.mapped_mget("key1", "key2")
# # => { "key1" => "v1", "key2" => "v2" }
#
# @param [Array<String>] keys array of keys
# @return [Hash] a hash mapping the specified keys to their values
#
@@ -1266,11 +1306,11 @@
if member.is_a? Array
# Variadic: return integer
reply
else
# Single argument: return boolean
- _boolify.call(reply)
+ Boolify.call(reply)
end
end
end
end
@@ -1288,11 +1328,11 @@
if member.is_a? Array
# Variadic: return integer
reply
else
# Single argument: return boolean
- _boolify.call(reply)
+ Boolify.call(reply)
end
end
end
end
@@ -1327,22 +1367,22 @@
# @param [String] destination destination key
# @param [String] member member to move from `source` to `destination`
# @return [Boolean]
def smove(source, destination, member)
synchronize do |client|
- client.call([:smove, source, destination, member], &_boolify)
+ client.call([:smove, source, destination, member], &Boolify)
end
end
# Determine if a given value is a member of a set.
#
# @param [String] key
# @param [String] member
# @return [Boolean]
def sismember(key, member)
synchronize do |client|
- client.call([:sismember, key, member], &_boolify)
+ client.call([:sismember, key, member], &Boolify)
end
end
# Get all the members in a set.
#
@@ -1481,14 +1521,14 @@
end
synchronize do |client|
if args.size == 1 && args[0].is_a?(Array)
# Variadic: return float if INCR, integer if !INCR
- client.call([:zadd, key] + zadd_options + args[0], &(incr ? _floatify : _identity))
+ client.call([:zadd, key] + zadd_options + args[0], &(incr ? Floatify : nil))
elsif args.size == 2
# Single pair: return float if INCR, boolean if !INCR
- client.call([:zadd, key] + zadd_options + args, &(incr ? _floatify : _boolify))
+ client.call([:zadd, key] + zadd_options + args, &(incr ? Floatify : Boolify))
else
raise ArgumentError, "wrong number of arguments"
end
end
end
@@ -1503,11 +1543,11 @@
# @param [Float] increment
# @param [String] member
# @return [Float] score of the member after incrementing it
def zincrby(key, increment, member)
synchronize do |client|
- client.call([:zincrby, key, increment, member], &_floatify)
+ client.call([:zincrby, key, increment, member], &Floatify)
end
end
# Remove one or more members from a sorted set.
#
@@ -1532,11 +1572,11 @@
if member.is_a? Array
# Variadic: return integer
reply
else
# Single argument: return boolean
- _boolify.call(reply)
+ Boolify.call(reply)
end
end
end
end
@@ -1549,11 +1589,11 @@
# @param [String] key
# @param [String] member
# @return [Float] score of the member
def zscore(key, member)
synchronize do |client|
- client.call([:zscore, key, member], &_floatify)
+ client.call([:zscore, key, member], &Floatify)
end
end
# Return a range of members in a sorted set, by index.
#
@@ -1578,11 +1618,11 @@
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << "WITHSCORES"
- block = _floatify_pairs
+ block = FloatifyPairs
end
synchronize do |client|
client.call([:zrange, key, start, stop] + args, &block)
end
@@ -1604,11 +1644,11 @@
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << "WITHSCORES"
- block = _floatify_pairs
+ block = FloatifyPairs
end
synchronize do |client|
client.call([:zrevrange, key, start, stop] + args, &block)
end
@@ -1742,11 +1782,11 @@
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << "WITHSCORES"
- block = _floatify_pairs
+ block = FloatifyPairs
end
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
@@ -1774,11 +1814,11 @@
with_scores = options[:with_scores] || options[:withscores]
if with_scores
args << ["WITHSCORES"]
- block = _floatify_pairs
+ block = FloatifyPairs
end
limit = options[:limit]
args.concat(["LIMIT"] + limit) if limit
@@ -1904,11 +1944,11 @@
# @param [String] field
# @param [String] value
# @return [Boolean] whether or not the field was **added** to the hash
def hset(key, field, value)
synchronize do |client|
- client.call([:hset, key, field, value], &_boolify)
+ client.call([:hset, key, field, value], &Boolify)
end
end
# Set the value of a hash field, only if the field does not exist.
#
@@ -1916,11 +1956,11 @@
# @param [String] field
# @param [String] value
# @return [Boolean] whether or not the field was **added** to the hash
def hsetnx(key, field, value)
synchronize do |client|
- client.call([:hsetnx, key, field, value], &_boolify)
+ client.call([:hsetnx, key, field, value], &Boolify)
end
end
# Set one or more hash values.
#
@@ -2019,11 +2059,11 @@
# @param [String] key
# @param [String] field
# @return [Boolean] whether or not the field exists in the hash
def hexists(key, field)
synchronize do |client|
- client.call([:hexists, key, field], &_boolify)
+ client.call([:hexists, key, field], &Boolify)
end
end
# Increment the integer value of a hash field by the given integer number.
#
@@ -2043,11 +2083,11 @@
# @param [String] field
# @param [Float] increment
# @return [Float] value of the field after incrementing it
def hincrbyfloat(key, field, increment)
synchronize do |client|
- client.call([:hincrbyfloat, key, field, increment], &_floatify)
+ client.call([:hincrbyfloat, key, field, increment], &Floatify)
end
end
# Get all the fields in a hash.
#
@@ -2073,11 +2113,11 @@
#
# @param [String] key
# @return [Hash<String, String>]
def hgetall(key)
synchronize do |client|
- client.call([:hgetall, key], &_hashify)
+ client.call([:hgetall, key], &Hashify)
end
end
# Post a message to a channel.
def publish(channel, message)
@@ -2093,14 +2133,21 @@
end
# Listen for messages published to the given channels.
def subscribe(*channels, &block)
synchronize do |client|
- _subscription(:subscribe, channels, block)
+ _subscription(:subscribe, 0, channels, block)
end
end
+ # Listen for messages published to the given channels. Throw a timeout error if there is no messages for a timeout period.
+ def subscribe_with_timeout(timeout, *channels, &block)
+ synchronize do |client|
+ _subscription(:subscribe_with_timeout, timeout, channels, block)
+ end
+ end
+
# Stop listening for messages posted to the given channels.
def unsubscribe(*channels)
synchronize do |client|
raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed?
client.unsubscribe(*channels)
@@ -2108,14 +2155,21 @@
end
# Listen for messages published to channels matching the given patterns.
def psubscribe(*channels, &block)
synchronize do |client|
- _subscription(:psubscribe, channels, block)
+ _subscription(:psubscribe, 0, channels, block)
end
end
+ # Listen for messages published to channels matching the given patterns. Throw a timeout error if there is no messages for a timeout period.
+ def psubscribe_with_timeout(timeout, *channels, &block)
+ synchronize do |client|
+ _subscription(:psubscribe_with_timeout, timeout, channels, block)
+ end
+ end
+
# Stop listening for messages posted to channels matching the given patterns.
def punsubscribe(*channels)
synchronize do |client|
raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed?
client.punsubscribe(*channels)
@@ -2310,11 +2364,11 @@
if subcommand == "exists"
synchronize do |client|
arg = args.first
client.call([:script, :exists, arg]) do |reply|
- reply = reply.map { |r| _boolify.call(r) }
+ reply = reply.map { |r| Boolify.call(r) }
if arg.is_a?(Array)
reply
else
reply.first
@@ -2464,11 +2518,11 @@
# - `:count => Integer`: return count keys at most per iteration
#
# @return [String, Array<[String, String]>] the next cursor and all found keys
def hscan(key, cursor, options={})
_scan(:hscan, cursor, [key], options) do |reply|
- [reply[0], _pairify(reply[1])]
+ [reply[0], reply[1].each_slice(2).to_a]
end
end
# Scan a hash
#
@@ -2503,11 +2557,11 @@
#
# @return [String, Array<[String, Float]>] the next cursor and all found
# members and scores
def zscan(key, cursor, options={})
_scan(:zscan, cursor, [key], options) do |reply|
- [reply[0], _floatify_pairs.call(reply[1])]
+ [reply[0], FloatifyPairs.call(reply[1])]
end
end
# Scan a sorted set
#
@@ -2571,11 +2625,11 @@
# @param [String] key
# @param [String, Array<String>] member one member, or array of members
# @return [Boolean] true if at least 1 HyperLogLog internal register was altered. false otherwise.
def pfadd(key, member)
synchronize do |client|
- client.call([:pfadd, key, member], &_boolify)
+ client.call([:pfadd, key, member], &Boolify)
end
end
# Get the approximate cardinality of members added to HyperLogLog structure.
#
@@ -2596,11 +2650,11 @@
# @param [String] dest_key destination key
# @param [String, Array<String>] source_key source key, or array of keys
# @return [Boolean]
def pfmerge(dest_key, *source_key)
synchronize do |client|
- client.call([:pfmerge, dest_key, *source_key], &_boolify_set)
+ client.call([:pfmerge, dest_key, *source_key], &BoolifySet)
end
end
# Interact with the sentinel command (masters, master, slaves, failover)
#
@@ -2615,13 +2669,13 @@
when "get-master-addr-by-name"
reply
else
if reply.kind_of?(Array)
if reply[0].kind_of?(Array)
- reply.map(&_hashify)
+ reply.map(&Hashify)
else
- _hashify.call(reply)
+ Hashify.call(reply)
end
else
reply
end
end
@@ -2650,73 +2704,62 @@
private
# Commands returning 1 for true and 0 for false may be executed in a pipeline
# where the method call will return nil. Propagate the nil instead of falsely
# returning false.
- def _boolify
+ Boolify =
lambda { |value|
value == 1 if value
}
- end
- def _boolify_set
+ BoolifySet =
lambda { |value|
if value && "OK" == value
true
else
false
end
}
- end
- def _hashify
+ Hashify =
lambda { |array|
hash = Hash.new
array.each_slice(2) do |field, value|
hash[field] = value
end
hash
}
- end
- def _floatify
+ Floatify =
lambda { |str|
- return unless str
-
- if (inf = str.match(/^(-)?inf/i))
- (inf[1] ? -1.0 : 1.0) / 0.0
- else
- Float(str)
+ if str
+ if (inf = str.match(/^(-)?inf/i))
+ (inf[1] ? -1.0 : 1.0) / 0.0
+ else
+ Float(str)
+ end
end
}
- end
- def _floatify_pairs
+ FloatifyPairs =
lambda { |array|
- return unless array
-
- array.each_slice(2).map do |member, score|
- [member, _floatify.call(score)]
+ if array
+ array.each_slice(2).map do |member, score|
+ [member, Floatify.call(score)]
+ end
end
}
- end
- def _pairify(array)
- array.each_slice(2).to_a
- end
-
- def _identity
- lambda { |value|
- value
- }
- end
-
- def _subscription(method, channels, block)
+ def _subscription(method, timeout, channels, block)
return @client.call([method] + channels) if subscribed?
begin
original, @client = @client, SubscribedClient.new(@client)
- @client.send(method, *channels, &block)
+ if timeout > 0
+ @client.send(method, timeout, *channels, &block)
+ else
+ @client.send(method, *channels, &block)
+ end
ensure
@client = original
end
end