lib/memcache.rb in ninjudd-memcache-0.9.0 vs lib/memcache.rb in ninjudd-memcache-0.9.1

- old
+ new

@@ -1,138 +1,21 @@ -$TESTING = defined?($TESTING) && $TESTING - -require 'socket' -require 'thread' -require 'timeout' -require 'rubygems' -require File.dirname(__FILE__) + '/memcache_mock' require 'zlib' -<<<<<<< HEAD -## -# A Ruby client library for memcached. -# -# This is intended to provide access to basic memcached functionality. It -# does not attempt to be complete implementation of the entire API, but it is -# approaching a complete implementation. -======= $:.unshift(File.dirname(__FILE__)) require 'memcache/server' require 'memcache/local_server' require 'memcache/segmented_server' ->>>>>>> refactor -class MemCache - ## - # The version of MemCache you are using. +class Memcache + VERSION = '0.9.0' -<<<<<<< HEAD - VERSION = '1.5.0.3' unless defined? VERSION -======= DEFAULT_EXPIRY = 0 LOCK_TIMEOUT = 5 WRITE_LOCK_WAIT = 0.001 ->>>>>>> refactor - ## - # Default options for the cache object. + attr_reader :default_expiry, :default_namespace, :servers -<<<<<<< HEAD - DEFAULT_OPTIONS = { - :namespace => nil, - :readonly => false, - :multithread => false, - } unless defined? DEFAULT_OPTIONS - - ## - # Default memcached port. - - DEFAULT_PORT = 11211 unless defined? DEFAULT_PORT - - ## - # Default memcached server weight. - - DEFAULT_WEIGHT = 1 unless defined? DEFAULT_WEIGHT - - ## - # Default number of servers to try connecting to. - - DEFAULT_FALLBACK = 20 unless defined? DEFAULT_FALLBACK - - ## - # Default expiry if none is specified. - - DEFAULT_EXPIRY = 0 unless defined? DEFAULT_EXPIRY - - ## - # The amount of time to wait for a response from a memcached server. If a - # response is not completed within this time, the connection to the server - # will be closed and an error will be raised. - - attr_accessor :request_timeout - - ## - # The multithread setting for this instance - - attr_reader :multithread - - ## - # The servers this client talks to. Play at your own peril. - - attr_reader :servers - - ## - # Number of servers to try connecting to. - attr_reader :fallback - - ## - # Default expiry if none is specified. - attr_reader :default_expiry - - ## - # Accepts a list of +servers+ and a list of +opts+. +servers+ may be - # omitted. See +servers=+ for acceptable server list arguments. - # - # Valid options for +opts+ are: - # - # [:namespace] Prepends this value to all keys added or retrieved. - # [:readonly] Raises an exeception on cache writes when true. - # [:multithread] Wraps cache access in a Mutex for thread safety. - # [:fallback] Number of servers to try before failing. - # [:default_expiry] Default expiry if none is specified. - # Other options are ignored. - - def initialize(*args) - servers = [] - opts = {} - - case args.length - when 0 then # NOP - when 1 then - arg = args.shift - case arg - when Hash then opts = arg - when Array then servers = arg - when String then servers = [arg] - else raise ArgumentError, 'first argument must be Array, Hash or String' - end - when 2 then - servers, opts = args - else - raise ArgumentError, "wrong number of arguments (#{args.length} for 2)" - end - - opts = DEFAULT_OPTIONS.merge opts - @namespace = opts[:namespace] - @readonly = opts[:readonly] - @multithread = opts[:multithread] - @fallback = opts[:fallback] || DEFAULT_FALLBACK - @default_expiry = opts[:default_expiry].to_i || DEFAULT_EXPIRY - @mutex = Mutex.new if @multithread - @buckets = [] - self.servers = servers -======= def initialize(opts) @readonly = opts[:readonly] @default_expiry = opts[:default_expiry] || DEFAULT_EXPIRY @default_namespace = opts[:namespace] default_server = opts[:segment_large_values] ? SegmentedServer : Server @@ -152,85 +35,35 @@ end end def inspect "<Memcache: %d servers, ns: %p, ro: %p>" % [@servers.length, namespace, @readonly] ->>>>>>> refactor end - ## - # Returns the namespace for the current thread. - def namespace -<<<<<<< HEAD - Thread.current[:memcache_namespace] || @namespace -======= @namespace || default_namespace ->>>>>>> refactor end - ## - # Set the namespace for the current thread. - def namespace=(namespace) -<<<<<<< HEAD - if namespace == @namespace - Thread.current[:memcache_namespace] = nil -======= if default_namespace == namespace @namespace = nil ->>>>>>> refactor else @namespace = namespace end end - ## - # Returns a string representation of the cache object. - - def inspect - "<MemCache: %d servers, %d buckets, ns: %p, ro: %p>" % - [@servers.length, @buckets.length, namespace, @readonly] + def in_namespace(namespace) + # Temporarily change the namespace for convenience. + begin + old_namespace = self.namespace + self.namespace = "#{old_namespace}#{namespace}" + yield + ensure + self.namespace = old_namespace + end end -<<<<<<< HEAD - ## - # Returns whether there is at least one active server for the object. - - def active? - not @servers.empty? - end - - ## - # Returns whether or not the cache object was created read only. - - def readonly? - @readonly - end - - ## - # Set the servers that the requests will be distributed between. Entries - # can be either strings of the form "hostname:port" or - # "hostname:port:weight" or MemCache::Server objects. - - def servers=(servers) - # Create the server objects. - @servers = servers.collect do |server| - case server - when String - host, port, weight = server.split ':', 3 - port ||= DEFAULT_PORT - weight ||= DEFAULT_WEIGHT - Server.new self, host, port, weight - when Server - if server.memcache.multithread != @multithread then - raise ArgumentError, "can't mix threaded and non-threaded servers" - end - server - else - raise TypeError, "cannot convert #{server.class} into MemCache::Server" -======= def get(keys, opts = {}) raise 'opts must be hash' unless opts.kind_of?(Hash) if keys.kind_of?(Array) multi_get(keys, opts) @@ -240,95 +73,15 @@ if opts[:expiry] value = server(key).gets(key) server(key).cas(key, value, value.memcache_cas, opts[:expiry]) if value else value = server(key).get(key, opts[:cas]) ->>>>>>> refactor end unmarshal(value, opts) end -<<<<<<< HEAD - - # Create an array of server buckets for weight selection of servers. - @buckets = [] - @servers.each do |server| - server.weight.times { @buckets.push(server) } - end end - ## - # Decrements the value for +key+ by +amount+ and returns the new value. - # +key+ must already exist. If +key+ is not an integer, it is assumed to be - # 0. +key+ can not be decremented below 0. - - def decr(key, amount = 1) - raise MemCacheError, "Update of readonly cache" if @readonly - with_server(key) do |server, cache_key| - cache_decr server, cache_key, amount - end - rescue TypeError => err - handle_error nil, err - end - - ## - # Retrieves +key+ from memcache. If +raw+ is false, the value will be - # unmarshalled. - - def get(key, raw = false) - with_server(key) do |server, cache_key| - value = cache_get server, cache_key - return nil if value.nil? - value = Marshal.load(value) unless raw - return value - end - rescue TypeError => err - handle_error nil, err - end - - ## - # Retrieves multiple values from memcached in parallel, if possible. - # - # The memcached protocol supports the ability to retrieve multiple - # keys in a single request. Pass in an array of keys to this method - # and it will: - # - # 1. map the key to the appropriate memcached server - # 2. send a single request to each server that has one or more key values - # - # Returns a hash of values. - # - # cache["a"] = 1 - # cache["b"] = 2 - # cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 } - - def get_multi(*keys) - raise MemCacheError, 'No active servers' unless active? - - opts = keys.last.kind_of?(Hash) ? keys.pop : {} - - keys.flatten! - key_count = keys.length - cache_keys = {} - server_keys = Hash.new { |h,k| h[k] = [] } - - # map keys to servers - keys.each do |key| - server, cache_key = request_setup key - cache_keys[cache_key] = key - server_keys[server] << cache_key - end - - results = {} - - server_keys.each do |server, keys| - keys = keys.join ' ' - values = cache_get_multi server, keys - values.each do |key, value| - results[cache_keys[key]] = opts[:raw] ? value : Marshal.load(value) -======= - end - def read(keys, opts = {}) get(keys, opts.merge(:raw => true)) end def set(key, value, opts = {}) @@ -447,77 +200,15 @@ method = opts[:overwrite] ? :set : :add if keys_to_fetch.any? yield(keys_to_fetch).each do |key, value| self.send(method, key, value, opts) unless opts[:disable] or opts[:disable_write] records[key] = value ->>>>>>> refactor end end - - return results - rescue TypeError => err - handle_error nil, err + records end -<<<<<<< HEAD - ## - # Increments the value for +key+ by +amount+ and returns the new value. - # +key+ must already exist. If +key+ is not an integer, it is assumed to be - # 0. - - def incr(key, amount = 1) - raise MemCacheError, "Update of readonly cache" if @readonly - with_server(key) do |server, cache_key| - cache_incr server, cache_key, amount - end - rescue TypeError => err - handle_error nil, err - end - - ## - # Add +key+ to the cache with value +value+ that expires in +expiry+ - # seconds. If +raw+ is true, +value+ will not be Marshalled. - # - # Warning: Readers should not call this method in the event of a cache miss; - # see MemCache#add. - - def set(key, value, expiry = default_expiry, raw = false) - raise MemCacheError, "Update of readonly cache" if @readonly - with_server(key) do |server, cache_key| - value = Marshal.dump value unless raw - cache_store(:set, cache_key, value, expiry, server) - end - end - - ## - # Add +key+ to the cache with value +value+ that expires in +expiry+ - # seconds, but only if +key+ does not already exist in the cache. - # If +raw+ is true, +value+ will not be Marshalled. - # - # Readers should call this method in the event of a cache miss, not - # MemCache#set or MemCache#[]=. - - def add(key, value, expiry = default_expiry, raw = false) - raise MemCacheError, "Update of readonly cache" if @readonly - with_server(key) do |server, cache_key| - value = Marshal.dump value unless raw - cache_store(:add, cache_key, value, expiry, server) - end - end - - ## - # Removes +key+ from the cache in +expiry+ seconds. - - def delete(key, expiry = default_expiry) - raise MemCacheError, "Update of readonly cache" if @readonly - server, cache_key = request_setup key - - with_socket_management(server) do |socket| - socket.write "delete #{cache_key} #{expiry}\r\n" - socket.gets - end -======= def lock(key, opts = {}) # Returns false if the lock already exists. expiry = opts[:expiry] || LOCK_TIMEOUT add(lock_key(key), Socket.gethostname, :expiry => expiry, :raw => true) end @@ -544,137 +235,43 @@ end def delete(key) key = cache_key(key) server(key).delete(key) ->>>>>>> refactor end - ## - # Flush the cache from all memcache servers. + def flush_all(opts = {}) + delay = opts[:delay].to_i + interval = opts[:interval].to_i - def flush_all - raise MemCacheError, 'No active servers' unless active? - raise MemCacheError, "Update of readonly cache" if @readonly - begin - @mutex.lock if @multithread - @servers.each do |server| - with_socket_management(server) do |socket| - socket.write "flush_all\r\n" - result = socket.gets - raise MemCacheError, $2.strip if result =~ /^(SERVER_)?ERROR(.*)/ - end - end - ensure - @mutex.unlock if @multithread + servers.each do |server| + server.flush_all(delay) + delay += interval end end - ## - # Reset the connection to all memcache servers. This should be called if - # there is a problem with a cache lookup that might have left the connection - # in a corrupted state. - def reset -<<<<<<< HEAD - @servers.each { |server| server.close } -======= servers.each {|server| server.close} ->>>>>>> refactor end - ## - # Returns statistics for each memcached server. An explanation of the - # statistics can be found in the memcached docs: - # - # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt - # - # Example: - # - # >> pp CACHE.stats - # {"localhost:11211"=> - # {"bytes"=>4718, - # "pid"=>20188, - # "connection_structures"=>4, - # "time"=>1162278121, - # "pointer_size"=>32, - # "limit_maxbytes"=>67108864, - # "cmd_get"=>14532, - # "version"=>"1.2.0", - # "bytes_written"=>432583, - # "cmd_set"=>32, - # "get_misses"=>0, - # "total_connections"=>19, - # "curr_connections"=>3, - # "curr_items"=>4, - # "uptime"=>1557, - # "get_hits"=>14532, - # "total_items"=>32, - # "rusage_system"=>0.313952, - # "rusage_user"=>0.119981, - # "bytes_read"=>190619}} - # => nil - def stats - raise MemCacheError, "No active servers" unless active? - server_stats = {} - - @servers.each do |server| - next unless server.alive? - with_socket_management(server) do |socket| - value = nil # TODO: why is this line here? - socket.write "stats\r\n" - stats = {} - while line = socket.gets do - break if line == "END\r\n" - if line =~ /^STAT ([\w]+) ([\w\.\:]+)/ then - name, value = $1, $2 - stats[name] = case name - when 'version' - value - when 'rusage_user', 'rusage_system' then - seconds, microseconds = value.split(/:/, 2) - microseconds ||= 0 - Float(seconds) + (Float(microseconds) / 1_000_000) - else - if value =~ /^\d+$/ then - value.to_i - else - value - end - end - end - end - server_stats["#{server.host}:#{server.port}"] = stats - end + stats = {} + servers.each do |server| + stats[server.name] = server.stats end - - server_stats + stats end - ## - # Shortcut to get a value from the cache. - + alias clear flush_all alias [] get - ## - # Shortcut to save a value in the cache. This method does not set an - # expiration on the entry. Use set to specify an explicit expiry. - def []=(key, value) set(key, value) end - protected unless $TESTING +protected -<<<<<<< HEAD - ## - # Create a key for the cache, incorporating the namespace qualifier if - # requested. - - def make_cache_key(key) -======= def multi_get(keys, opts = {}) return {} if keys.empty? key_to_input_key = {} keys_by_server = Hash.new { |h,k| h[k] = [] } @@ -697,11 +294,10 @@ end results end def cache_key(key) ->>>>>>> refactor safe_key = key ? key.to_s.gsub(/%/, '%%').gsub(/ /, '%s') : key if namespace.nil? then safe_key else "#{namespace}:#{safe_key}" @@ -721,361 +317,19 @@ object rescue Exception => e nil end - ## - # Pick a server to handle the request based on a hash of the key. - - def get_server_for_key(key) - raise ArgumentError, "illegal character in key #{key.inspect}" if key =~ /\s/ + def server(key) raise ArgumentError, "key too long #{key.inspect}" if key.length > 250 - raise MemCacheError, "No servers available" if @servers.empty? - return @servers.first if @servers.length == 1 + return servers.first if servers.length == 1 - hkey = hash_for key - - fallback.times do |try| - server = @buckets[hkey % @buckets.nitems] - return server if server.alive? - hkey += hash_for "#{try}#{key}" - end - - raise MemCacheError, "No servers available" + n = Zlib.crc32(key) % servers.length + servers[n] end - ## - # Returns an interoperable hash value for +key+. (I think, docs are - # sketchy for down servers). - - def hash_for(key) - (Zlib.crc32(key) >> 16) & 0x7fff - end - - ## - # Performs a raw decr for +cache_key+ from +server+. Returns nil if not - # found. - - def cache_decr(server, cache_key, amount) - with_socket_management(server) do |socket| - socket.write "decr #{cache_key} #{amount}\r\n" - text = socket.gets - return nil if text == "NOT_FOUND\r\n" - return text.to_i - end - end - - ## - # Fetches the raw data for +cache_key+ from +server+. Returns nil on cache - # miss. - - def cache_get(server, cache_key) - with_socket_management(server) do |socket| - socket.write "get #{cache_key}\r\n" - keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" - - if keyline.nil? then - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too - end - - return nil if keyline == "END\r\n" - - unless keyline =~ /(\d+)\r/ then - server.close - raise MemCacheError, "unexpected response #{keyline.inspect}" - end - value = socket.read $1.to_i - socket.read 2 # "\r\n" - socket.gets # "END\r\n" - return value - end - end - - ## - # Fetches +cache_keys+ from +server+ using a multi-get. - - def cache_get_multi(server, cache_keys) - with_socket_management(server) do |socket| - values = {} - socket.write "get #{cache_keys}\r\n" - - while keyline = socket.gets do - return values if keyline == "END\r\n" - - unless keyline =~ /^VALUE (.+) (.+) (.+)/ then - server.close - raise MemCacheError, "unexpected response #{keyline.inspect}" - end - - key, data_length = $1, $3 - values[$1] = socket.read data_length.to_i - socket.read(2) # "\r\n" - end - - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too - end - end - - ## - # Stores +value+ to +cache_keys+ on +server+ using +method+ (must be :set or :add). - - def cache_store(method, cache_key, value, expiry, server) - raise MemCacheError, "unknown store method #{method}" unless [:set, :add].include?(method) - - command = "#{method} #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" - - with_socket_management(server) do |socket| - socket.write command - result = socket.gets - if result.nil? - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" - end - - if result =~ /^SERVER_ERROR (.*)/ - server.close - raise MemCacheError, "%s:\n%s" % [$1.strip, Marshal.restore(value).inspect] - end - - result - end - end - - ## - # Performs a raw incr for +cache_key+ from +server+. Returns nil if not - # found. - - def cache_incr(server, cache_key, amount) - with_socket_management(server) do |socket| - socket.write "incr #{cache_key} #{amount}\r\n" - text = socket.gets - return nil if text == "NOT_FOUND\r\n" - return text.to_i - end - end - - ## - # Gets or creates a socket connected to the given server, and yields it - # to the block. If a socket error (SocketError, SystemCallError, IOError) - # or protocol error (MemCacheError) is raised by the block, closes the - # socket, attempts to connect again, and retries the block (once). If - # an error is again raised, reraises it as MemCacheError. - # If unable to connect to the server (or if in the reconnect wait period), - # raises MemCacheError - note that the socket connect code marks a server - # dead for a timeout period, so retrying does not apply to connection attempt - # failures (but does still apply to unexpectedly lost connections etc.). - # Wraps the whole lot in mutex synchronization if @multithread is true. - - def with_socket_management(server, &block) - @mutex.lock if @multithread - retried = false - begin - socket = server.socket - # Raise an IndexError to show this server is out of whack. - # We'll catch it in higher-level code and attempt to restart the operation. - raise IndexError, "No connection to server (#{server.status})" if socket.nil? - - block.call(socket) - rescue MemCacheError, SocketError, SystemCallError, IOError => err - handle_error(server, err) if retried || socket.nil? - retried = true - retry - rescue Exception => err - server.close - raise err - end - ensure - @mutex.unlock if @multithread - end - - def with_server(key) - retried = false - begin - server, cache_key = request_setup(key) - yield server, cache_key - rescue IndexError => e - if !retried && @servers.size > 1 - puts "Connection to server #{server.inspect} DIED! Retrying operation..." - retried = true - retry - end - handle_error(nil, e) - end - end - - ## - # Handles +error+ from +server+. - - def handle_error(server, error) - raise error if error.is_a?(MemCacheError) - server.close if server - new_error = MemCacheError.new error.message - new_error.set_backtrace error.backtrace - raise new_error - end - - ## - # Performs setup for making a request with +key+ from memcached. Returns - # the server to fetch the key from and the complete key to use. - - def request_setup(key) - raise MemCacheError, 'No active servers' unless active? - cache_key = make_cache_key key - server = get_server_for_key cache_key - return server, cache_key - end - - ## - # This class represents a memcached server instance. - - class Server - - ## - # The amount of time to wait to establish a connection with a memcached - # server. If a connection cannot be established within this time limit, - # the server will be marked as down. - - CONNECT_TIMEOUT = 1.0 unless defined? CONNECT_TIMEOUT - - ## - # The amount of time to wait before attempting to re-establish a - # connection with a server that is marked dead. - - RETRY_DELAY = 1.0 unless defined? RETRY_DELAY - - ## - # The host the memcached server is running on. - - attr_reader :host - - ## - # The port the memcached server is listening on. - - attr_reader :port - - ## - # The weight given to the server. - - attr_reader :weight - - ## - # The time of next retry if the connection is dead. - - attr_reader :retry - - ## - # A text status string describing the state of the server. - - attr_reader :status - - ## - # Create a new MemCache::Server object for the memcached instance - # listening on the given host and port, weighted by the given weight. - - def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT) - raise ArgumentError, "No host specified" if host.nil? or host.empty? - raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero? - - @memcache = memcache - @host = host - @port = port.to_i - @weight = weight.to_i - - @multithread = @memcache.multithread - @mutex = Mutex.new - - @sock = nil - @retry = nil - @status = 'NOT CONNECTED' - end - - ## - # Return a string representation of the server object. - - def inspect - "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status] - end - - ## - # Check whether the server connection is alive. This will cause the - # socket to attempt to connect if it isn't already connected and or if - # the server was previously marked as down and the retry time has - # been exceeded. - - def alive? - !!socket - end - - ## - # Try to connect to the memcached server targeted by this object. - # Returns the connected socket object on success or nil on failure. - - def socket - @mutex.lock if @multithread - return @sock if @sock and not @sock.closed? - - @sock = nil - - # If the host was dead, don't retry for a while. - return if @retry and @retry > Time.now - - # Attempt to connect if not already connected. - begin - @sock = timeout CONNECT_TIMEOUT do - TCPSocket.new @host, @port - end - if Socket.constants.include? 'TCP_NODELAY' then - @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 - end - @retry = nil - @status = 'CONNECTED' - rescue SocketError, SystemCallError, IOError, Timeout::Error => err - mark_dead err.message - end - - return @sock - ensure - @mutex.unlock if @multithread - end - - ## - # Close the connection to the memcached server targeted by this - # object. The server is not considered dead. - - def close - @mutex.lock if @multithread - @sock.close if @sock && !@sock.closed? - @sock = nil - @retry = nil - @status = "NOT CONNECTED" - ensure - @mutex.unlock if @multithread - end - - private - - ## - # Mark the server as dead and close its socket. - - def mark_dead(reason = "Unknown error") - @sock.close if @sock && !@sock.closed? - @sock = nil - @retry = Time.now + RETRY_DELAY - - @status = sprintf "DEAD: %s, will retry at %s", reason, @retry - end - - end - - ## - # Base MemCache exception class. - - class MemCacheError < RuntimeError; end - - class CachePool + class Pool attr_reader :fallback def initialize @cache_by_scope = {} @cache_by_scope[:default] = Memcache.new(:server => Memcache::LocalServer) @@ -1095,24 +349,16 @@ end def []=(scope, cache) @cache_by_scope[scope.to_sym] = cache end - - def reset - @cache_by_scope.values.each {|c| c.reset} - end end def self.pool - @@cache_pool ||= CachePool.new + @@cache_pool ||= Pool.new end - end -<<<<<<< HEAD -======= # Add flags and cas class Object attr_accessor :memcache_flags, :memcache_cas end ->>>>>>> refactor