lib/memcache.rb in fiveruns-memcache-client-1.5.0.3 vs lib/memcache.rb in fiveruns-memcache-client-1.5.0.4

- old
+ new

@@ -16,15 +16,14 @@ end rescue LoadError => e puts "Loading with slow CRC32 ITU-T implementation: #{e.message}" def crc32_ITU_T - n = length r = 0xFFFFFFFF - n.times do |i| - r ^= self[i] + each_byte do |i| + r ^= i 8.times do if (r & 1) != 0 then r = (r>>1) ^ 0xEDB88320 else r >>= 1 @@ -98,11 +97,11 @@ # omitted. See +servers=+ for acceptable server list arguments. # # Valid options for +opts+ are: # # [:namespace] Prepends this value to all keys added or retrieved. - # [:readonly] Raises an exeception on cache writes when true. + # [:readonly] Raises an exception on cache writes when true. # [:multithread] Wraps cache access in a Mutex for thread safety. # # Other options are ignored. def initialize(*args) @@ -161,11 +160,11 @@ # can be either strings of the form "hostname:port" or # "hostname:port:weight" or MemCache::Server objects. def servers=(servers) # Create the server objects. - @servers = servers.collect do |server| + @servers = Array(servers).collect do |server| case server when String host, port, weight = server.split ':', 3 port ||= DEFAULT_PORT weight ||= DEFAULT_WEIGHT @@ -196,11 +195,11 @@ raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| cache_decr server, cache_key, amount end rescue TypeError => err - handle_error server, err + handle_error nil, err end ## # Retrieves +key+ from memcache. If +raw+ is false, the value will be # unmarshalled. @@ -211,11 +210,11 @@ return nil if value.nil? value = Marshal.load value unless raw return value end rescue TypeError => err - handle_error server, err + handle_error nil, err end ## # Retrieves multiple values from memcached in parallel, if possible. # @@ -247,21 +246,21 @@ server_keys[server] << cache_key end results = {} - server_keys.each do |server, keys| - keys = keys.join ' ' - values = cache_get_multi server, keys + server_keys.each do |server, keys_for_server| + keys_for_server = keys_for_server.join ' ' + values = cache_get_multi server, keys_for_server values.each do |key, value| results[cache_keys[key]] = Marshal.load value end end return results - rescue TypeError => err - handle_error server, err + rescue TypeError, IndexError => err + handle_error nil, err end ## # Increments the value for +key+ by +amount+ and returns the new value. # +key+ must already exist. If +key+ is not an integer, it is assumed to be @@ -271,13 +270,13 @@ raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| cache_incr server, cache_key, amount end rescue TypeError => err - handle_error server, err + handle_error nil, err end - + ## # Add +key+ to the cache with value +value+ that expires in +expiry+ # seconds. If +raw+ is true, +value+ will not be Marshalled. # # Warning: Readers should not call this method in the event of a cache miss; @@ -291,19 +290,18 @@ command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command result = socket.gets - if result.nil? + raise_on_error_response! result + + if result.nil? server.close raise MemCacheError, "lost connection to #{server.host}:#{server.port}" end - if result =~ /^SERVER_ERROR (.*)/ - server.close - raise MemCacheError, $1.strip - end + result end end end ## @@ -320,43 +318,51 @@ value = Marshal.dump value unless raw command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command - socket.gets + result = socket.gets + raise_on_error_response! result + result end end end - + ## # Removes +key+ from the cache in +expiry+ seconds. def delete(key, expiry = 0) raise MemCacheError, "Update of readonly cache" if @readonly - server, cache_key = request_setup key - - with_socket_management(server) do |socket| - socket.write "delete #{cache_key} #{expiry}\r\n" - socket.gets + with_server(key) do |server, cache_key| + with_socket_management(server) do |socket| + socket.write "delete #{cache_key} #{expiry}\r\n" + result = socket.gets + raise_on_error_response! result + result + end end end ## # Flush the cache from all memcache servers. def flush_all raise MemCacheError, 'No active servers' unless active? raise MemCacheError, "Update of readonly cache" if @readonly + begin @mutex.lock if @multithread @servers.each do |server| with_socket_management(server) do |socket| socket.write "flush_all\r\n" result = socket.gets - raise MemCacheError, $2.strip if result =~ /^(SERVER_)?ERROR(.*)/ + raise_on_error_response! result + result end end + rescue IndexError => err + handle_error nil, err ensure @mutex.unlock if @multithread end end @@ -405,27 +411,29 @@ raise MemCacheError, "No active servers" unless active? server_stats = {} @servers.each do |server| next unless server.alive? + with_socket_management(server) do |socket| - value = nil # TODO: why is this line here? + value = nil socket.write "stats\r\n" stats = {} while line = socket.gets do + raise_on_error_response! line break if line == "END\r\n" - if line =~ /^STAT ([\w]+) ([\w\.\:]+)/ then + if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then name, value = $1, $2 stats[name] = case name when 'version' value when 'rusage_user', 'rusage_system' then seconds, microseconds = value.split(/:/, 2) microseconds ||= 0 Float(seconds) + (Float(microseconds) / 1_000_000) else - if value =~ /^\d+$/ then + if value =~ /\A\d+\Z/ then value.to_i else value end end @@ -433,10 +441,11 @@ end server_stats["#{server.host}:#{server.port}"] = stats end end + raise MemCacheError, "No active servers" if server_stats.empty? server_stats end ## # Shortcut to get a value from the cache. @@ -476,11 +485,11 @@ return @servers.first if @servers.length == 1 hkey = hash_for key 20.times do |try| - server = @buckets[hkey % @buckets.nitems] + server = @buckets[hkey % @buckets.compact.size] return server if server.alive? hkey += hash_for "#{try}#{key}" end raise MemCacheError, "No servers available" @@ -500,10 +509,11 @@ def cache_decr(server, cache_key, amount) with_socket_management(server) do |socket| socket.write "decr #{cache_key} #{amount}\r\n" text = socket.gets + raise_on_error_response! text return nil if text == "NOT_FOUND\r\n" return text.to_i end end @@ -516,13 +526,14 @@ socket.write "get #{cache_key}\r\n" keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" if keyline.nil? then server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" end + raise_on_error_response! keyline return nil if keyline == "END\r\n" unless keyline =~ /(\d+)\r/ then server.close raise MemCacheError, "unexpected response #{keyline.inspect}" @@ -542,12 +553,13 @@ values = {} socket.write "get #{cache_keys}\r\n" while keyline = socket.gets do return values if keyline == "END\r\n" + raise_on_error_response! keyline - unless keyline =~ /^VALUE (.+) (.+) (.+)/ then + unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then server.close raise MemCacheError, "unexpected response #{keyline.inspect}" end key, data_length = $1, $3 @@ -566,35 +578,40 @@ def cache_incr(server, cache_key, amount) with_socket_management(server) do |socket| socket.write "incr #{cache_key} #{amount}\r\n" text = socket.gets + raise_on_error_response! text return nil if text == "NOT_FOUND\r\n" return text.to_i end end - + ## # Gets or creates a socket connected to the given server, and yields it - # to the block. If a socket error (SocketError, SystemCallError, IOError) - # or protocol error (MemCacheError) is raised by the block, closes the - # socket, attempts to connect again, and retries the block (once). If - # an error is again raised, reraises it as MemCacheError. + # to the block, wrapped in a mutex synchronization if @multithread is true. + # + # If a socket error (SocketError, SystemCallError, IOError) or protocol error + # (MemCacheError) is raised by the block, closes the socket, attempts to + # connect again, and retries the block (once). If an error is again raised, + # reraises it as MemCacheError. + # # If unable to connect to the server (or if in the reconnect wait period), - # raises MemCacheError - note that the socket connect code marks a server + # raises MemCacheError. Note that the socket connect code marks a server # dead for a timeout period, so retrying does not apply to connection attempt - # failures (but does still apply to unexpectedly lost connections etc.). - # Wraps the whole lot in mutex synchronization if @multithread is true. + # failures (but does still apply to unexpectedly lost connections etc.). def with_socket_management(server, &block) @mutex.lock if @multithread retried = false begin socket = server.socket - # Raise an IndexError to show this server is out of whack. - # We'll catch it in higher-level code and attempt to restart the operation. + + # Raise an IndexError to show this server is out of whack. If were inside + # a with_server block, we'll catch it and attempt to restart the operation. raise IndexError, "No connection to server (#{server.status})" if socket.nil? + block.call(socket) rescue MemCacheError, SocketError, SystemCallError, IOError => err handle_error(server, err) if retried || socket.nil? retried = true retry @@ -608,11 +625,11 @@ begin server, cache_key = request_setup(key) yield server, cache_key rescue IndexError => e if !retried && @servers.size > 1 - puts "Connection to server #{server.inspect} DIED! Retrying operation..." + puts "Connection to server #{server.inspect} DIED! Retrying operation..." retried = true retry end handle_error(nil, e) end @@ -636,9 +653,15 @@ def request_setup(key) raise MemCacheError, 'No active servers' unless active? cache_key = make_cache_key key server = get_server_for_key cache_key return server, cache_key + end + + def raise_on_error_response!(response) + if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/ + raise MemCacheError, $1.strip + end end ## # This class represents a memcached server instance.