lib/memcache.rb in fiveruns-memcache-client-1.5.0.3 vs lib/memcache.rb in fiveruns-memcache-client-1.5.0.4
- old
+ new
@@ -16,15 +16,14 @@
end
rescue LoadError => e
puts "Loading with slow CRC32 ITU-T implementation: #{e.message}"
def crc32_ITU_T
- n = length
r = 0xFFFFFFFF
- n.times do |i|
- r ^= self[i]
+ each_byte do |i|
+ r ^= i
8.times do
if (r & 1) != 0 then
r = (r>>1) ^ 0xEDB88320
else
r >>= 1
@@ -98,11 +97,11 @@
# omitted. See +servers=+ for acceptable server list arguments.
#
# Valid options for +opts+ are:
#
# [:namespace] Prepends this value to all keys added or retrieved.
- # [:readonly] Raises an exeception on cache writes when true.
+ # [:readonly] Raises an exception on cache writes when true.
# [:multithread] Wraps cache access in a Mutex for thread safety.
#
# Other options are ignored.
def initialize(*args)
@@ -161,11 +160,11 @@
# can be either strings of the form "hostname:port" or
# "hostname:port:weight" or MemCache::Server objects.
def servers=(servers)
# Create the server objects.
- @servers = servers.collect do |server|
+ @servers = Array(servers).collect do |server|
case server
when String
host, port, weight = server.split ':', 3
port ||= DEFAULT_PORT
weight ||= DEFAULT_WEIGHT
@@ -196,11 +195,11 @@
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
cache_decr server, cache_key, amount
end
rescue TypeError => err
- handle_error server, err
+ handle_error nil, err
end
##
# Retrieves +key+ from memcache. If +raw+ is false, the value will be
# unmarshalled.
@@ -211,11 +210,11 @@
return nil if value.nil?
value = Marshal.load value unless raw
return value
end
rescue TypeError => err
- handle_error server, err
+ handle_error nil, err
end
##
# Retrieves multiple values from memcached in parallel, if possible.
#
@@ -247,21 +246,21 @@
server_keys[server] << cache_key
end
results = {}
- server_keys.each do |server, keys|
- keys = keys.join ' '
- values = cache_get_multi server, keys
+ server_keys.each do |server, keys_for_server|
+ keys_for_server = keys_for_server.join ' '
+ values = cache_get_multi server, keys_for_server
values.each do |key, value|
results[cache_keys[key]] = Marshal.load value
end
end
return results
- rescue TypeError => err
- handle_error server, err
+ rescue TypeError, IndexError => err
+ handle_error nil, err
end
##
# Increments the value for +key+ by +amount+ and returns the new value.
# +key+ must already exist. If +key+ is not an integer, it is assumed to be
@@ -271,13 +270,13 @@
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
cache_incr server, cache_key, amount
end
rescue TypeError => err
- handle_error server, err
+ handle_error nil, err
end
-
+
##
# Add +key+ to the cache with value +value+ that expires in +expiry+
# seconds. If +raw+ is true, +value+ will not be Marshalled.
#
# Warning: Readers should not call this method in the event of a cache miss;
@@ -291,19 +290,18 @@
command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
with_socket_management(server) do |socket|
socket.write command
result = socket.gets
- if result.nil?
+ raise_on_error_response! result
+
+ if result.nil?
server.close
raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
end
- if result =~ /^SERVER_ERROR (.*)/
- server.close
- raise MemCacheError, $1.strip
- end
+ result
end
end
end
##
@@ -320,43 +318,51 @@
value = Marshal.dump value unless raw
command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
with_socket_management(server) do |socket|
socket.write command
- socket.gets
+ result = socket.gets
+ raise_on_error_response! result
+ result
end
end
end
-
+
##
# Removes +key+ from the cache in +expiry+ seconds.
def delete(key, expiry = 0)
raise MemCacheError, "Update of readonly cache" if @readonly
- server, cache_key = request_setup key
-
- with_socket_management(server) do |socket|
- socket.write "delete #{cache_key} #{expiry}\r\n"
- socket.gets
+ with_server(key) do |server, cache_key|
+ with_socket_management(server) do |socket|
+ socket.write "delete #{cache_key} #{expiry}\r\n"
+ result = socket.gets
+ raise_on_error_response! result
+ result
+ end
end
end
##
# Flush the cache from all memcache servers.
def flush_all
raise MemCacheError, 'No active servers' unless active?
raise MemCacheError, "Update of readonly cache" if @readonly
+
begin
@mutex.lock if @multithread
@servers.each do |server|
with_socket_management(server) do |socket|
socket.write "flush_all\r\n"
result = socket.gets
- raise MemCacheError, $2.strip if result =~ /^(SERVER_)?ERROR(.*)/
+ raise_on_error_response! result
+ result
end
end
+ rescue IndexError => err
+ handle_error nil, err
ensure
@mutex.unlock if @multithread
end
end
@@ -405,27 +411,29 @@
raise MemCacheError, "No active servers" unless active?
server_stats = {}
@servers.each do |server|
next unless server.alive?
+
with_socket_management(server) do |socket|
- value = nil # TODO: why is this line here?
+ value = nil
socket.write "stats\r\n"
stats = {}
while line = socket.gets do
+ raise_on_error_response! line
break if line == "END\r\n"
- if line =~ /^STAT ([\w]+) ([\w\.\:]+)/ then
+ if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then
name, value = $1, $2
stats[name] = case name
when 'version'
value
when 'rusage_user', 'rusage_system' then
seconds, microseconds = value.split(/:/, 2)
microseconds ||= 0
Float(seconds) + (Float(microseconds) / 1_000_000)
else
- if value =~ /^\d+$/ then
+ if value =~ /\A\d+\Z/ then
value.to_i
else
value
end
end
@@ -433,10 +441,11 @@
end
server_stats["#{server.host}:#{server.port}"] = stats
end
end
+ raise MemCacheError, "No active servers" if server_stats.empty?
server_stats
end
##
# Shortcut to get a value from the cache.
@@ -476,11 +485,11 @@
return @servers.first if @servers.length == 1
hkey = hash_for key
20.times do |try|
- server = @buckets[hkey % @buckets.nitems]
+ server = @buckets[hkey % @buckets.compact.size]
return server if server.alive?
hkey += hash_for "#{try}#{key}"
end
raise MemCacheError, "No servers available"
@@ -500,10 +509,11 @@
def cache_decr(server, cache_key, amount)
with_socket_management(server) do |socket|
socket.write "decr #{cache_key} #{amount}\r\n"
text = socket.gets
+ raise_on_error_response! text
return nil if text == "NOT_FOUND\r\n"
return text.to_i
end
end
@@ -516,13 +526,14 @@
socket.write "get #{cache_key}\r\n"
keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
if keyline.nil? then
server.close
- raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
+ raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
end
+ raise_on_error_response! keyline
return nil if keyline == "END\r\n"
unless keyline =~ /(\d+)\r/ then
server.close
raise MemCacheError, "unexpected response #{keyline.inspect}"
@@ -542,12 +553,13 @@
values = {}
socket.write "get #{cache_keys}\r\n"
while keyline = socket.gets do
return values if keyline == "END\r\n"
+ raise_on_error_response! keyline
- unless keyline =~ /^VALUE (.+) (.+) (.+)/ then
+ unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then
server.close
raise MemCacheError, "unexpected response #{keyline.inspect}"
end
key, data_length = $1, $3
@@ -566,35 +578,40 @@
def cache_incr(server, cache_key, amount)
with_socket_management(server) do |socket|
socket.write "incr #{cache_key} #{amount}\r\n"
text = socket.gets
+ raise_on_error_response! text
return nil if text == "NOT_FOUND\r\n"
return text.to_i
end
end
-
+
##
# Gets or creates a socket connected to the given server, and yields it
- # to the block. If a socket error (SocketError, SystemCallError, IOError)
- # or protocol error (MemCacheError) is raised by the block, closes the
- # socket, attempts to connect again, and retries the block (once). If
- # an error is again raised, reraises it as MemCacheError.
+ # to the block, wrapped in a mutex synchronization if @multithread is true.
+ #
+ # If a socket error (SocketError, SystemCallError, IOError) or protocol error
+ # (MemCacheError) is raised by the block, closes the socket, attempts to
+ # connect again, and retries the block (once). If an error is again raised,
+ # reraises it as MemCacheError.
+ #
# If unable to connect to the server (or if in the reconnect wait period),
- # raises MemCacheError - note that the socket connect code marks a server
+ # raises MemCacheError. Note that the socket connect code marks a server
# dead for a timeout period, so retrying does not apply to connection attempt
- # failures (but does still apply to unexpectedly lost connections etc.).
- # Wraps the whole lot in mutex synchronization if @multithread is true.
+ # failures (but does still apply to unexpectedly lost connections etc.).
def with_socket_management(server, &block)
@mutex.lock if @multithread
retried = false
begin
socket = server.socket
- # Raise an IndexError to show this server is out of whack.
- # We'll catch it in higher-level code and attempt to restart the operation.
+
+ # Raise an IndexError to show this server is out of whack. If were inside
+ # a with_server block, we'll catch it and attempt to restart the operation.
raise IndexError, "No connection to server (#{server.status})" if socket.nil?
+
block.call(socket)
rescue MemCacheError, SocketError, SystemCallError, IOError => err
handle_error(server, err) if retried || socket.nil?
retried = true
retry
@@ -608,11 +625,11 @@
begin
server, cache_key = request_setup(key)
yield server, cache_key
rescue IndexError => e
if !retried && @servers.size > 1
- puts "Connection to server #{server.inspect} DIED! Retrying operation..."
+ puts "Connection to server #{server.inspect} DIED! Retrying operation..."
retried = true
retry
end
handle_error(nil, e)
end
@@ -636,9 +653,15 @@
def request_setup(key)
raise MemCacheError, 'No active servers' unless active?
cache_key = make_cache_key key
server = get_server_for_key cache_key
return server, cache_key
+ end
+
+ def raise_on_error_response!(response)
+ if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/
+ raise MemCacheError, $1.strip
+ end
end
##
# This class represents a memcached server instance.