lib/memcache.rb in fiveruns-memcache-client-1.5.0.1 vs lib/memcache.rb in fiveruns-memcache-client-1.5.0.3
- old
+ new
@@ -7,29 +7,37 @@
class String
##
# Uses the ITU-T polynomial in the CRC32 algorithm.
+ begin
+ require 'crc32'
+ def crc32_ITU_T
+ CRC32.itu_t(self)
+ end
+ rescue LoadError => e
+ puts "Loading with slow CRC32 ITU-T implementation: #{e.message}"
+
+ def crc32_ITU_T
+ n = length
+ r = 0xFFFFFFFF
- def crc32_ITU_T
- n = length
- r = 0xFFFFFFFF
-
- n.times do |i|
- r ^= self[i]
- 8.times do
- if (r & 1) != 0 then
- r = (r>>1) ^ 0xEDB88320
- else
- r >>= 1
+ n.times do |i|
+ r ^= self[i]
+ 8.times do
+ if (r & 1) != 0 then
+ r = (r>>1) ^ 0xEDB88320
+ else
+ r >>= 1
+ end
end
end
- end
- r ^ 0xFFFFFFFF
+ r ^ 0xFFFFFFFF
+ end
end
-
+
end
##
# A Ruby client library for memcached.
#
@@ -178,45 +186,35 @@
server.weight.times { @buckets.push(server) }
end
end
##
- # Deceremets the value for +key+ by +amount+ and returns the new value.
+ # Decrements the value for +key+ by +amount+ and returns the new value.
# +key+ must already exist. If +key+ is not an integer, it is assumed to be
# 0. +key+ can not be decremented below 0.
def decr(key, amount = 1)
- server, cache_key = request_setup key
-
- if @multithread then
- threadsafe_cache_decr server, cache_key, amount
- else
+ raise MemCacheError, "Update of readonly cache" if @readonly
+ with_server(key) do |server, cache_key|
cache_decr server, cache_key, amount
end
- rescue TypeError, SocketError, SystemCallError, IOError => err
+ rescue TypeError => err
handle_error server, err
end
##
# Retrieves +key+ from memcache. If +raw+ is false, the value will be
# unmarshalled.
def get(key, raw = false)
- server, cache_key = request_setup key
-
- value = if @multithread then
- threadsafe_cache_get server, cache_key
- else
- cache_get server, cache_key
- end
-
- return nil if value.nil?
-
- value = Marshal.load value unless raw
-
- return value
- rescue TypeError, SocketError, SystemCallError, IOError => err
+ with_server(key) do |server, cache_key|
+ value = cache_get server, cache_key
+ return nil if value.nil?
+ value = Marshal.load value unless raw
+ return value
+ end
+ rescue TypeError => err
handle_error server, err
end
##
# Retrieves multiple values from memcached in parallel, if possible.
@@ -251,75 +249,62 @@
results = {}
server_keys.each do |server, keys|
keys = keys.join ' '
- values = if @multithread then
- threadsafe_cache_get_multi server, keys
- else
- cache_get_multi server, keys
- end
+ values = cache_get_multi server, keys
values.each do |key, value|
results[cache_keys[key]] = Marshal.load value
end
end
return results
- rescue TypeError, SocketError, SystemCallError, IOError => err
+ rescue TypeError => err
handle_error server, err
end
##
- # Increments the value for +key+ by +amount+ and retruns the new value.
+ # Increments the value for +key+ by +amount+ and returns the new value.
# +key+ must already exist. If +key+ is not an integer, it is assumed to be
# 0.
def incr(key, amount = 1)
- server, cache_key = request_setup key
-
- if @multithread then
- threadsafe_cache_incr server, cache_key, amount
- else
+ raise MemCacheError, "Update of readonly cache" if @readonly
+ with_server(key) do |server, cache_key|
cache_incr server, cache_key, amount
end
- rescue TypeError, SocketError, SystemCallError, IOError => err
+ rescue TypeError => err
handle_error server, err
end
-
+
##
# Add +key+ to the cache with value +value+ that expires in +expiry+
# seconds. If +raw+ is true, +value+ will not be Marshalled.
#
# Warning: Readers should not call this method in the event of a cache miss;
# see MemCache#add.
def set(key, value, expiry = 0, raw = false)
raise MemCacheError, "Update of readonly cache" if @readonly
- server, cache_key = request_setup key
- socket = server.socket
+ with_server(key) do |server, cache_key|
- value = Marshal.dump value unless raw
- command = "set #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
+ value = Marshal.dump value unless raw
+ command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
- begin
- @mutex.lock if @multithread
- socket.write command
- result = socket.gets
- if result.nil?
- server.close
- raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
- end
+ with_socket_management(server) do |socket|
+ socket.write command
+ result = socket.gets
+ if result.nil?
+ server.close
+ raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
+ end
- if result =~ /^SERVER_ERROR (.*)/
- server.close
- raise MemCacheError, $1.strip
- end
- rescue SocketError, SystemCallError, IOError => err
- server.close
- raise MemCacheError, err.message
- ensure
- @mutex.unlock if @multithread
+ if result =~ /^SERVER_ERROR (.*)/
+ server.close
+ raise MemCacheError, $1.strip
+ end
+ end
end
end
##
# Add +key+ to the cache with value +value+ that expires in +expiry+
@@ -329,50 +314,32 @@
# Readers should call this method in the event of a cache miss, not
# MemCache#set or MemCache#[]=.
def add(key, value, expiry = 0, raw = false)
raise MemCacheError, "Update of readonly cache" if @readonly
- server, cache_key = request_setup key
- socket = server.socket
+ with_server(key) do |server, cache_key|
+ value = Marshal.dump value unless raw
+ command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
- value = Marshal.dump value unless raw
- command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
-
- begin
- @mutex.lock if @multithread
- socket.write command
- socket.gets
- rescue SocketError, SystemCallError, IOError => err
- server.close
- raise MemCacheError, err.message
- ensure
- @mutex.unlock if @multithread
+ with_socket_management(server) do |socket|
+ socket.write command
+ socket.gets
+ end
end
end
-
+
##
# Removes +key+ from the cache in +expiry+ seconds.
def delete(key, expiry = 0)
- @mutex.lock if @multithread
+ raise MemCacheError, "Update of readonly cache" if @readonly
+ server, cache_key = request_setup key
- raise MemCacheError, "No active servers" unless active?
- cache_key = make_cache_key key
- server = get_server_for_key cache_key
-
- sock = server.socket
- raise MemCacheError, "No connection to server" if sock.nil?
-
- begin
- sock.write "delete #{cache_key} #{expiry}\r\n"
- sock.gets
- rescue SocketError, SystemCallError, IOError => err
- server.close
- raise MemCacheError, err.message
+ with_socket_management(server) do |socket|
+ socket.write "delete #{cache_key} #{expiry}\r\n"
+ socket.gets
end
- ensure
- @mutex.unlock if @multithread
end
##
# Flush the cache from all memcache servers.
@@ -380,19 +347,14 @@
raise MemCacheError, 'No active servers' unless active?
raise MemCacheError, "Update of readonly cache" if @readonly
begin
@mutex.lock if @multithread
@servers.each do |server|
- begin
- sock = server.socket
- raise MemCacheError, "No connection to server" if sock.nil?
- sock.write "flush_all\r\n"
- result = sock.gets
+ with_socket_management(server) do |socket|
+ socket.write "flush_all\r\n"
+ result = socket.gets
raise MemCacheError, $2.strip if result =~ /^(SERVER_)?ERROR(.*)/
- rescue SocketError, SystemCallError, IOError => err
- server.close
- raise MemCacheError, err.message
end
end
ensure
@mutex.unlock if @multithread
end
@@ -442,18 +404,16 @@
def stats
raise MemCacheError, "No active servers" unless active?
server_stats = {}
@servers.each do |server|
- sock = server.socket
- raise MemCacheError, "No connection to server" if sock.nil?
-
- value = nil
- begin
- sock.write "stats\r\n"
+ next unless server.alive?
+ with_socket_management(server) do |socket|
+ value = nil # TODO: why is this line here?
+ socket.write "stats\r\n"
stats = {}
- while line = sock.gets do
+ while line = socket.gets do
break if line == "END\r\n"
if line =~ /^STAT ([\w]+) ([\w\.\:]+)/ then
name, value = $1, $2
stats[name] = case name
when 'version'
@@ -470,13 +430,10 @@
end
end
end
end
server_stats["#{server.host}:#{server.port}"] = stats
- rescue SocketError, SystemCallError, IOError => err
- server.close
- raise MemCacheError, err.message
end
end
server_stats
end
@@ -540,84 +497,134 @@
##
# Performs a raw decr for +cache_key+ from +server+. Returns nil if not
# found.
def cache_decr(server, cache_key, amount)
- socket = server.socket
- socket.write "decr #{cache_key} #{amount}\r\n"
- text = socket.gets
- return nil if text == "NOT_FOUND\r\n"
- return text.to_i
+ with_socket_management(server) do |socket|
+ socket.write "decr #{cache_key} #{amount}\r\n"
+ text = socket.gets
+ return nil if text == "NOT_FOUND\r\n"
+ return text.to_i
+ end
end
##
# Fetches the raw data for +cache_key+ from +server+. Returns nil on cache
# miss.
def cache_get(server, cache_key)
- socket = server.socket
- socket.write "get #{cache_key}\r\n"
- keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
+ with_socket_management(server) do |socket|
+ socket.write "get #{cache_key}\r\n"
+ keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
- if keyline.nil? then
- server.close
- raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
- end
+ if keyline.nil? then
+ server.close
+ raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
+ end
- return nil if keyline == "END\r\n"
+ return nil if keyline == "END\r\n"
- unless keyline =~ /(\d+)\r/ then
- server.close
- raise MemCacheError, "unexpected response #{keyline.inspect}"
+ unless keyline =~ /(\d+)\r/ then
+ server.close
+ raise MemCacheError, "unexpected response #{keyline.inspect}"
+ end
+ value = socket.read $1.to_i
+ socket.read 2 # "\r\n"
+ socket.gets # "END\r\n"
+ return value
end
- value = socket.read $1.to_i
- socket.read 2 # "\r\n"
- socket.gets # "END\r\n"
- return value
end
##
# Fetches +cache_keys+ from +server+ using a multi-get.
def cache_get_multi(server, cache_keys)
- values = {}
- socket = server.socket
- socket.write "get #{cache_keys}\r\n"
+ with_socket_management(server) do |socket|
+ values = {}
+ socket.write "get #{cache_keys}\r\n"
- while keyline = socket.gets do
- return values if keyline == "END\r\n"
+ while keyline = socket.gets do
+ return values if keyline == "END\r\n"
- unless keyline =~ /^VALUE (.+) (.+) (.+)/ then
- server.close
- raise MemCacheError, "unexpected response #{keyline.inspect}"
+ unless keyline =~ /^VALUE (.+) (.+) (.+)/ then
+ server.close
+ raise MemCacheError, "unexpected response #{keyline.inspect}"
+ end
+
+ key, data_length = $1, $3
+ values[$1] = socket.read data_length.to_i
+ socket.read(2) # "\r\n"
end
- key, data_length = $1, $3
- values[$1] = socket.read data_length.to_i
- socket.read(2) # "\r\n"
+ server.close
+ raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
end
-
- server.close
- raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
end
##
# Performs a raw incr for +cache_key+ from +server+. Returns nil if not
# found.
def cache_incr(server, cache_key, amount)
- socket = server.socket
- socket.write "incr #{cache_key} #{amount}\r\n"
- text = socket.gets
- return nil if text == "NOT_FOUND\r\n"
- return text.to_i
+ with_socket_management(server) do |socket|
+ socket.write "incr #{cache_key} #{amount}\r\n"
+ text = socket.gets
+ return nil if text == "NOT_FOUND\r\n"
+ return text.to_i
+ end
end
+
+ ##
+ # Gets or creates a socket connected to the given server, and yields it
+ # to the block. If a socket error (SocketError, SystemCallError, IOError)
+ # or protocol error (MemCacheError) is raised by the block, closes the
+ # socket, attempts to connect again, and retries the block (once). If
+ # an error is again raised, reraises it as MemCacheError.
+ # If unable to connect to the server (or if in the reconnect wait period),
+ # raises MemCacheError - note that the socket connect code marks a server
+ # dead for a timeout period, so retrying does not apply to connection attempt
+ # failures (but does still apply to unexpectedly lost connections etc.).
+ # Wraps the whole lot in mutex synchronization if @multithread is true.
+ def with_socket_management(server, &block)
+ @mutex.lock if @multithread
+ retried = false
+ begin
+ socket = server.socket
+ # Raise an IndexError to show this server is out of whack.
+ # We'll catch it in higher-level code and attempt to restart the operation.
+ raise IndexError, "No connection to server (#{server.status})" if socket.nil?
+ block.call(socket)
+ rescue MemCacheError, SocketError, SystemCallError, IOError => err
+ handle_error(server, err) if retried || socket.nil?
+ retried = true
+ retry
+ end
+ ensure
+ @mutex.unlock if @multithread
+ end
+
+ def with_server(key)
+ retried = false
+ begin
+ server, cache_key = request_setup(key)
+ yield server, cache_key
+ rescue IndexError => e
+ if !retried && @servers.size > 1
+ puts "Connection to server #{server.inspect} DIED! Retrying operation..."
+ retried = true
+ retry
+ end
+ handle_error(nil, e)
+ end
+ end
+
##
# Handles +error+ from +server+.
def handle_error(server, error)
+ raise error if error.is_a?(MemCacheError)
server.close if server
new_error = MemCacheError.new error.message
new_error.set_backtrace error.backtrace
raise new_error
end
@@ -628,39 +635,10 @@
def request_setup(key)
raise MemCacheError, 'No active servers' unless active?
cache_key = make_cache_key key
server = get_server_for_key cache_key
- raise MemCacheError, 'No connection to server' if server.socket.nil?
return server, cache_key
- end
-
- def threadsafe_cache_decr(server, cache_key, amount) # :nodoc:
- @mutex.lock
- cache_decr server, cache_key, amount
- ensure
- @mutex.unlock
- end
-
- def threadsafe_cache_get(server, cache_key) # :nodoc:
- @mutex.lock
- cache_get server, cache_key
- ensure
- @mutex.unlock
- end
-
- def threadsafe_cache_get_multi(socket, cache_keys) # :nodoc:
- @mutex.lock
- cache_get_multi socket, cache_keys
- ensure
- @mutex.unlock
- end
-
- def threadsafe_cache_incr(server, cache_key, amount) # :nodoc:
- @mutex.lock
- cache_incr server, cache_key, amount
- ensure
- @mutex.unlock
end
##
# This class represents a memcached server instance.