lib/memcache.rb in fiveruns-memcache-client-1.5.0.1 vs lib/memcache.rb in fiveruns-memcache-client-1.5.0.3

- old
+ new

@@ -7,29 +7,37 @@ class String ## # Uses the ITU-T polynomial in the CRC32 algorithm. + begin + require 'crc32' + def crc32_ITU_T + CRC32.itu_t(self) + end + rescue LoadError => e + puts "Loading with slow CRC32 ITU-T implementation: #{e.message}" + + def crc32_ITU_T + n = length + r = 0xFFFFFFFF - def crc32_ITU_T - n = length - r = 0xFFFFFFFF - - n.times do |i| - r ^= self[i] - 8.times do - if (r & 1) != 0 then - r = (r>>1) ^ 0xEDB88320 - else - r >>= 1 + n.times do |i| + r ^= self[i] + 8.times do + if (r & 1) != 0 then + r = (r>>1) ^ 0xEDB88320 + else + r >>= 1 + end end end - end - r ^ 0xFFFFFFFF + r ^ 0xFFFFFFFF + end end - + end ## # A Ruby client library for memcached. # @@ -178,45 +186,35 @@ server.weight.times { @buckets.push(server) } end end ## - # Deceremets the value for +key+ by +amount+ and returns the new value. + # Decrements the value for +key+ by +amount+ and returns the new value. # +key+ must already exist. If +key+ is not an integer, it is assumed to be # 0. +key+ can not be decremented below 0. def decr(key, amount = 1) - server, cache_key = request_setup key - - if @multithread then - threadsafe_cache_decr server, cache_key, amount - else + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| cache_decr server, cache_key, amount end - rescue TypeError, SocketError, SystemCallError, IOError => err + rescue TypeError => err handle_error server, err end ## # Retrieves +key+ from memcache. If +raw+ is false, the value will be # unmarshalled. def get(key, raw = false) - server, cache_key = request_setup key - - value = if @multithread then - threadsafe_cache_get server, cache_key - else - cache_get server, cache_key - end - - return nil if value.nil? - - value = Marshal.load value unless raw - - return value - rescue TypeError, SocketError, SystemCallError, IOError => err + with_server(key) do |server, cache_key| + value = cache_get server, cache_key + return nil if value.nil? + value = Marshal.load value unless raw + return value + end + rescue TypeError => err handle_error server, err end ## # Retrieves multiple values from memcached in parallel, if possible. @@ -251,75 +249,62 @@ results = {} server_keys.each do |server, keys| keys = keys.join ' ' - values = if @multithread then - threadsafe_cache_get_multi server, keys - else - cache_get_multi server, keys - end + values = cache_get_multi server, keys values.each do |key, value| results[cache_keys[key]] = Marshal.load value end end return results - rescue TypeError, SocketError, SystemCallError, IOError => err + rescue TypeError => err handle_error server, err end ## - # Increments the value for +key+ by +amount+ and retruns the new value. + # Increments the value for +key+ by +amount+ and returns the new value. # +key+ must already exist. If +key+ is not an integer, it is assumed to be # 0. def incr(key, amount = 1) - server, cache_key = request_setup key - - if @multithread then - threadsafe_cache_incr server, cache_key, amount - else + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| cache_incr server, cache_key, amount end - rescue TypeError, SocketError, SystemCallError, IOError => err + rescue TypeError => err handle_error server, err end - + ## # Add +key+ to the cache with value +value+ that expires in +expiry+ # seconds. If +raw+ is true, +value+ will not be Marshalled. # # Warning: Readers should not call this method in the event of a cache miss; # see MemCache#add. def set(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly - server, cache_key = request_setup key - socket = server.socket + with_server(key) do |server, cache_key| - value = Marshal.dump value unless raw - command = "set #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" + value = Marshal.dump value unless raw + command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" - begin - @mutex.lock if @multithread - socket.write command - result = socket.gets - if result.nil? - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" - end + with_socket_management(server) do |socket| + socket.write command + result = socket.gets + if result.nil? + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end - if result =~ /^SERVER_ERROR (.*)/ - server.close - raise MemCacheError, $1.strip - end - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message - ensure - @mutex.unlock if @multithread + if result =~ /^SERVER_ERROR (.*)/ + server.close + raise MemCacheError, $1.strip + end + end end end ## # Add +key+ to the cache with value +value+ that expires in +expiry+ @@ -329,50 +314,32 @@ # Readers should call this method in the event of a cache miss, not # MemCache#set or MemCache#[]=. def add(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly - server, cache_key = request_setup key - socket = server.socket + with_server(key) do |server, cache_key| + value = Marshal.dump value unless raw + command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" - value = Marshal.dump value unless raw - command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" - - begin - @mutex.lock if @multithread - socket.write command - socket.gets - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message - ensure - @mutex.unlock if @multithread + with_socket_management(server) do |socket| + socket.write command + socket.gets + end end end - + ## # Removes +key+ from the cache in +expiry+ seconds. def delete(key, expiry = 0) - @mutex.lock if @multithread + raise MemCacheError, "Update of readonly cache" if @readonly + server, cache_key = request_setup key - raise MemCacheError, "No active servers" unless active? - cache_key = make_cache_key key - server = get_server_for_key cache_key - - sock = server.socket - raise MemCacheError, "No connection to server" if sock.nil? - - begin - sock.write "delete #{cache_key} #{expiry}\r\n" - sock.gets - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message + with_socket_management(server) do |socket| + socket.write "delete #{cache_key} #{expiry}\r\n" + socket.gets end - ensure - @mutex.unlock if @multithread end ## # Flush the cache from all memcache servers. @@ -380,19 +347,14 @@ raise MemCacheError, 'No active servers' unless active? raise MemCacheError, "Update of readonly cache" if @readonly begin @mutex.lock if @multithread @servers.each do |server| - begin - sock = server.socket - raise MemCacheError, "No connection to server" if sock.nil? - sock.write "flush_all\r\n" - result = sock.gets + with_socket_management(server) do |socket| + socket.write "flush_all\r\n" + result = socket.gets raise MemCacheError, $2.strip if result =~ /^(SERVER_)?ERROR(.*)/ - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message end end ensure @mutex.unlock if @multithread end @@ -442,18 +404,16 @@ def stats raise MemCacheError, "No active servers" unless active? server_stats = {} @servers.each do |server| - sock = server.socket - raise MemCacheError, "No connection to server" if sock.nil? - - value = nil - begin - sock.write "stats\r\n" + next unless server.alive? + with_socket_management(server) do |socket| + value = nil # TODO: why is this line here? + socket.write "stats\r\n" stats = {} - while line = sock.gets do + while line = socket.gets do break if line == "END\r\n" if line =~ /^STAT ([\w]+) ([\w\.\:]+)/ then name, value = $1, $2 stats[name] = case name when 'version' @@ -470,13 +430,10 @@ end end end end server_stats["#{server.host}:#{server.port}"] = stats - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message end end server_stats end @@ -540,84 +497,134 @@ ## # Performs a raw decr for +cache_key+ from +server+. Returns nil if not # found. def cache_decr(server, cache_key, amount) - socket = server.socket - socket.write "decr #{cache_key} #{amount}\r\n" - text = socket.gets - return nil if text == "NOT_FOUND\r\n" - return text.to_i + with_socket_management(server) do |socket| + socket.write "decr #{cache_key} #{amount}\r\n" + text = socket.gets + return nil if text == "NOT_FOUND\r\n" + return text.to_i + end end ## # Fetches the raw data for +cache_key+ from +server+. Returns nil on cache # miss. def cache_get(server, cache_key) - socket = server.socket - socket.write "get #{cache_key}\r\n" - keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" + with_socket_management(server) do |socket| + socket.write "get #{cache_key}\r\n" + keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" - if keyline.nil? then - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" - end + if keyline.nil? then + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too + end - return nil if keyline == "END\r\n" + return nil if keyline == "END\r\n" - unless keyline =~ /(\d+)\r/ then - server.close - raise MemCacheError, "unexpected response #{keyline.inspect}" + unless keyline =~ /(\d+)\r/ then + server.close + raise MemCacheError, "unexpected response #{keyline.inspect}" + end + value = socket.read $1.to_i + socket.read 2 # "\r\n" + socket.gets # "END\r\n" + return value end - value = socket.read $1.to_i - socket.read 2 # "\r\n" - socket.gets # "END\r\n" - return value end ## # Fetches +cache_keys+ from +server+ using a multi-get. def cache_get_multi(server, cache_keys) - values = {} - socket = server.socket - socket.write "get #{cache_keys}\r\n" + with_socket_management(server) do |socket| + values = {} + socket.write "get #{cache_keys}\r\n" - while keyline = socket.gets do - return values if keyline == "END\r\n" + while keyline = socket.gets do + return values if keyline == "END\r\n" - unless keyline =~ /^VALUE (.+) (.+) (.+)/ then - server.close - raise MemCacheError, "unexpected response #{keyline.inspect}" + unless keyline =~ /^VALUE (.+) (.+) (.+)/ then + server.close + raise MemCacheError, "unexpected response #{keyline.inspect}" + end + + key, data_length = $1, $3 + values[$1] = socket.read data_length.to_i + socket.read(2) # "\r\n" end - key, data_length = $1, $3 - values[$1] = socket.read data_length.to_i - socket.read(2) # "\r\n" + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too end - - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" end ## # Performs a raw incr for +cache_key+ from +server+. Returns nil if not # found. def cache_incr(server, cache_key, amount) - socket = server.socket - socket.write "incr #{cache_key} #{amount}\r\n" - text = socket.gets - return nil if text == "NOT_FOUND\r\n" - return text.to_i + with_socket_management(server) do |socket| + socket.write "incr #{cache_key} #{amount}\r\n" + text = socket.gets + return nil if text == "NOT_FOUND\r\n" + return text.to_i + end end + + ## + # Gets or creates a socket connected to the given server, and yields it + # to the block. If a socket error (SocketError, SystemCallError, IOError) + # or protocol error (MemCacheError) is raised by the block, closes the + # socket, attempts to connect again, and retries the block (once). If + # an error is again raised, reraises it as MemCacheError. + # If unable to connect to the server (or if in the reconnect wait period), + # raises MemCacheError - note that the socket connect code marks a server + # dead for a timeout period, so retrying does not apply to connection attempt + # failures (but does still apply to unexpectedly lost connections etc.). + # Wraps the whole lot in mutex synchronization if @multithread is true. + def with_socket_management(server, &block) + @mutex.lock if @multithread + retried = false + begin + socket = server.socket + # Raise an IndexError to show this server is out of whack. + # We'll catch it in higher-level code and attempt to restart the operation. + raise IndexError, "No connection to server (#{server.status})" if socket.nil? + block.call(socket) + rescue MemCacheError, SocketError, SystemCallError, IOError => err + handle_error(server, err) if retried || socket.nil? + retried = true + retry + end + ensure + @mutex.unlock if @multithread + end + + def with_server(key) + retried = false + begin + server, cache_key = request_setup(key) + yield server, cache_key + rescue IndexError => e + if !retried && @servers.size > 1 + puts "Connection to server #{server.inspect} DIED! Retrying operation..." + retried = true + retry + end + handle_error(nil, e) + end + end + ## # Handles +error+ from +server+. def handle_error(server, error) + raise error if error.is_a?(MemCacheError) server.close if server new_error = MemCacheError.new error.message new_error.set_backtrace error.backtrace raise new_error end @@ -628,39 +635,10 @@ def request_setup(key) raise MemCacheError, 'No active servers' unless active? cache_key = make_cache_key key server = get_server_for_key cache_key - raise MemCacheError, 'No connection to server' if server.socket.nil? return server, cache_key - end - - def threadsafe_cache_decr(server, cache_key, amount) # :nodoc: - @mutex.lock - cache_decr server, cache_key, amount - ensure - @mutex.unlock - end - - def threadsafe_cache_get(server, cache_key) # :nodoc: - @mutex.lock - cache_get server, cache_key - ensure - @mutex.unlock - end - - def threadsafe_cache_get_multi(socket, cache_keys) # :nodoc: - @mutex.lock - cache_get_multi socket, cache_keys - ensure - @mutex.unlock - end - - def threadsafe_cache_incr(server, cache_key, amount) # :nodoc: - @mutex.lock - cache_incr server, cache_key, amount - ensure - @mutex.unlock end ## # This class represents a memcached server instance.