lib/memcache.rb in memcache-client-1.6.5 vs lib/memcache.rb in memcache-client-1.7.0

- old
+ new

@@ -31,11 +31,11 @@ class MemCache ## # The version of MemCache you are using. - VERSION = '1.6.5' + VERSION = '1.7.0' ## # Default options for the cache object. DEFAULT_OPTIONS = { @@ -43,10 +43,11 @@ :readonly => false, :multithread => true, :failover => true, :timeout => 0.5, :logger => nil, + :no_reply => false, } ## # Default memcached port. @@ -88,10 +89,16 @@ # Log debug/info/warn/error to the given Logger, defaults to nil. attr_reader :logger ## + # Don't send or look for a reply from the memcached server for write operations. + # Please note this feature only works in memcached 1.2.5 and later. Earlier + # versions will reply with "ERROR". + attr_reader :no_reply + + ## # Accepts a list of +servers+ and a list of +opts+. +servers+ may be # omitted. See +servers=+ for acceptable server list arguments. # # Valid options for +opts+ are: # @@ -102,10 +109,13 @@ # first server is down? Defaults to true. # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec, # set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8, # "gem install SystemTimer' to remove most of the penalty). # [:logger] Logger to use for info/debug output, defaults to nil + # [:no_reply] Don't bother looking for a reply for write operations (i.e. they + # become 'fire and forget'), memcached 1.2.5 and later only, speeds up + # set/add/delete/incr/decr significantly. # # Other options are ignored. def initialize(*args) servers = [] @@ -132,10 +142,11 @@ @readonly = opts[:readonly] @multithread = opts[:multithread] @timeout = opts[:timeout] @failover = opts[:failover] @logger = opts[:logger] + @no_reply = opts[:no_reply] @mutex = Mutex.new if @multithread logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger Thread.current[:memcache_client] = self.object_id if !@multithread @@ -210,21 +221,40 @@ # Retrieves +key+ from memcache. If +raw+ is false, the value will be # unmarshalled. def get(key, raw = false) with_server(key) do |server, cache_key| + logger.debug { "get #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger value = cache_get server, cache_key - logger.debug { "GET #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger return nil if value.nil? value = Marshal.load value unless raw return value end rescue TypeError => err handle_error nil, err end ## + # Performs a +get+ with the given +key+. If + # the value does not exist and a block was given, + # the block will be called and the result saved via +add+. + # + # If you do not provide a block, using this + # method is the same as using +get+. + # + def fetch(key, expiry = 0, raw = false) + value = get(key, raw) + + if value.nil? && block_given? + value = yield + add(key, value, expiry, raw) + end + + value + end + + ## # Retrieves multiple values from memcached in parallel, if possible. # # The memcached protocol supports the ability to retrieve multiple # keys in a single request. Pass in an array of keys to this method # and it will: @@ -301,19 +331,20 @@ def set(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw - logger.debug { "SET #{key} to #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger - data = value.to_s + logger.debug { "set #{key} to #{server.inspect}: #{data.size}" } if logger + raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB - command = "set #{cache_key} 0 #{expiry} #{data.size}\r\n#{data}\r\n" + command = "set #{cache_key} 0 #{expiry} #{data.size}#{noreply}\r\n#{data}\r\n" with_socket_management(server) do |socket| socket.write command + break nil if @no_reply result = socket.gets raise_on_error_response! result if result.nil? server.close @@ -324,63 +355,183 @@ end end end ## + # "cas" is a check and set operation which means "store this data but + # only if no one else has updated since I last fetched it." This can + # be used as a form of optimistic locking. + # + # Works in block form like so: + # cache.cas('some-key') do |value| + # value + 1 + # end + # + # Returns: + # +nil+ if the value was not found on the memcached server. + # +STORED+ if the value was updated successfully + # +EXISTS+ if the value was updated by someone else since last fetch + + def cas(key, expiry=0, raw=false) + raise MemCacheError, "Update of readonly cache" if @readonly + raise MemCacheError, "A block is required" unless block_given? + + (value, token) = gets(key, raw) + return nil unless value + updated = yield value + + with_server(key) do |server, cache_key| + logger.debug { "cas #{key} to #{server.inspect}: #{data.size}" } if logger + + value = Marshal.dump updated unless raw + data = value.to_s + command = "cas #{cache_key} 0 #{expiry} #{value.size} #{token}#{noreply}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + + if result.nil? + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end + + result + end + end + end + + ## # Add +key+ to the cache with value +value+ that expires in +expiry+ # seconds, but only if +key+ does not already exist in the cache. # If +raw+ is true, +value+ will not be Marshalled. # # Readers should call this method in the event of a cache miss, not - # MemCache#set or MemCache#[]=. + # MemCache#set. def add(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw - logger.debug { "ADD #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger - command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" + logger.debug { "add #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command + break nil if @no_reply result = socket.gets raise_on_error_response! result result end end end + + ## + # Add +key+ to the cache with value +value+ that expires in +expiry+ + # seconds, but only if +key+ already exists in the cache. + # If +raw+ is true, +value+ will not be Marshalled. + def replace(key, value, expiry = 0, raw = false) + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| + value = Marshal.dump value unless raw + logger.debug { "replace #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "replace #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n" + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + result + end + end + end + ## + # Append - 'add this data to an existing key after existing data' + # Please note the value is always passed to memcached as raw since it + # doesn't make a lot of sense to concatenate marshalled data together. + def append(key, value) + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| + logger.debug { "append #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "append #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + result + end + end + end + + ## + # Prepend - 'add this data to an existing key before existing data' + # Please note the value is always passed to memcached as raw since it + # doesn't make a lot of sense to concatenate marshalled data together. + def prepend(key, value) + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| + logger.debug { "prepend #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "prepend #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + result + end + end + end + + ## # Removes +key+ from the cache in +expiry+ seconds. def delete(key, expiry = 0) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| with_socket_management(server) do |socket| - socket.write "delete #{cache_key} #{expiry}\r\n" + logger.debug { "delete #{cache_key} on #{server}" } if logger + socket.write "delete #{cache_key} #{expiry}#{noreply}\r\n" + break nil if @no_reply result = socket.gets raise_on_error_response! result result end end end ## # Flush the cache from all memcache servers. + # A non-zero value for +delay+ will ensure that the flush + # is propogated slowly through your memcached server farm. + # The Nth server will be flushed N*delay seconds from now, + # asynchronously so this method returns quickly. + # This prevents a huge database spike due to a total + # flush all at once. - def flush_all + def flush_all(delay=0) raise MemCacheError, 'No active servers' unless active? raise MemCacheError, "Update of readonly cache" if @readonly begin + delay_time = 0 @servers.each do |server| with_socket_management(server) do |socket| - socket.write "flush_all\r\n" + logger.debug { "flush_all #{delay_time} on #{server}" } if logger + socket.write "flush_all #{delay_time}#{noreply}\r\n" + break nil if @no_reply result = socket.gets raise_on_error_response! result result end + delay_time += delay end rescue IndexError => err handle_error nil, err end end @@ -528,11 +679,12 @@ # Performs a raw decr for +cache_key+ from +server+. Returns nil if not # found. def cache_decr(server, cache_key, amount) with_socket_management(server) do |socket| - socket.write "decr #{cache_key} #{amount}\r\n" + socket.write "decr #{cache_key} #{amount}#{noreply}\r\n" + break nil if @no_reply text = socket.gets raise_on_error_response! text return nil if text == "NOT_FOUND\r\n" return text.to_i end @@ -564,10 +716,42 @@ socket.gets # "END\r\n" return value end end + def gets(key, raw = false) + with_server(key) do |server, cache_key| + logger.debug { "gets #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger + result = with_socket_management(server) do |socket| + socket.write "gets #{cache_key}\r\n" + keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n" + + if keyline.nil? then + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end + + raise_on_error_response! keyline + return nil if keyline == "END\r\n" + + unless keyline =~ /(\d+) (\w+)\r/ then + server.close + raise MemCacheError, "unexpected response #{keyline.inspect}" + end + value = socket.read $1.to_i + socket.read 2 # "\r\n" + socket.gets # "END\r\n" + [value, $2] + end + result[0] = Marshal.load result[0] unless raw + result + end + rescue TypeError => err + handle_error nil, err + end + + ## # Fetches +cache_keys+ from +server+ using a multi-get. def cache_get_multi(server, cache_keys) with_socket_management(server) do |socket| @@ -597,11 +781,12 @@ # Performs a raw incr for +cache_key+ from +server+. Returns nil if not # found. def cache_incr(server, cache_key, amount) with_socket_management(server) do |socket| - socket.write "incr #{cache_key} #{amount}\r\n" + socket.write "incr #{cache_key} #{amount}#{noreply}\r\n" + break nil if @no_reply text = socket.gets raise_on_error_response! text return nil if text == "NOT_FOUND\r\n" return text.to_i end @@ -675,9 +860,13 @@ raise error if error.is_a?(MemCacheError) server.close if server new_error = MemCacheError.new error.message new_error.set_backtrace error.backtrace raise new_error + end + + def noreply + @no_reply ? ' noreply' : '' end ## # Performs setup for making a request with +key+ from memcached. Returns # the server to fetch the key from and the complete key to use.