$TESTING = defined?($TESTING) && $TESTING require 'socket' require 'thread' require 'timeout' require 'rubygems' require File.dirname(__FILE__) + '/memcache_mock' require 'zlib' <<<<<<< HEAD ## # A Ruby client library for memcached. # # This is intended to provide access to basic memcached functionality. It # does not attempt to be complete implementation of the entire API, but it is # approaching a complete implementation. ======= $:.unshift(File.dirname(__FILE__)) require 'memcache/server' require 'memcache/local_server' require 'memcache/segmented_server' >>>>>>> refactor class MemCache ## # The version of MemCache you are using. <<<<<<< HEAD VERSION = '1.5.0.3' unless defined? VERSION ======= DEFAULT_EXPIRY = 0 LOCK_TIMEOUT = 5 WRITE_LOCK_WAIT = 0.001 >>>>>>> refactor ## # Default options for the cache object. <<<<<<< HEAD DEFAULT_OPTIONS = { :namespace => nil, :readonly => false, :multithread => false, } unless defined? DEFAULT_OPTIONS ## # Default memcached port. DEFAULT_PORT = 11211 unless defined? DEFAULT_PORT ## # Default memcached server weight. DEFAULT_WEIGHT = 1 unless defined? DEFAULT_WEIGHT ## # Default number of servers to try connecting to. DEFAULT_FALLBACK = 20 unless defined? DEFAULT_FALLBACK ## # Default expiry if none is specified. DEFAULT_EXPIRY = 0 unless defined? DEFAULT_EXPIRY ## # The amount of time to wait for a response from a memcached server. If a # response is not completed within this time, the connection to the server # will be closed and an error will be raised. attr_accessor :request_timeout ## # The multithread setting for this instance attr_reader :multithread ## # The servers this client talks to. Play at your own peril. attr_reader :servers ## # Number of servers to try connecting to. attr_reader :fallback ## # Default expiry if none is specified. attr_reader :default_expiry ## # Accepts a list of +servers+ and a list of +opts+. +servers+ may be # omitted. See +servers=+ for acceptable server list arguments. # # Valid options for +opts+ are: # # [:namespace] Prepends this value to all keys added or retrieved. # [:readonly] Raises an exeception on cache writes when true. # [:multithread] Wraps cache access in a Mutex for thread safety. # [:fallback] Number of servers to try before failing. # [:default_expiry] Default expiry if none is specified. # Other options are ignored. def initialize(*args) servers = [] opts = {} case args.length when 0 then # NOP when 1 then arg = args.shift case arg when Hash then opts = arg when Array then servers = arg when String then servers = [arg] else raise ArgumentError, 'first argument must be Array, Hash or String' end when 2 then servers, opts = args else raise ArgumentError, "wrong number of arguments (#{args.length} for 2)" end opts = DEFAULT_OPTIONS.merge opts @namespace = opts[:namespace] @readonly = opts[:readonly] @multithread = opts[:multithread] @fallback = opts[:fallback] || DEFAULT_FALLBACK @default_expiry = opts[:default_expiry].to_i || DEFAULT_EXPIRY @mutex = Mutex.new if @multithread @buckets = [] self.servers = servers ======= def initialize(opts) @readonly = opts[:readonly] @default_expiry = opts[:default_expiry] || DEFAULT_EXPIRY @default_namespace = opts[:namespace] default_server = opts[:segment_large_values] ? SegmentedServer : Server @servers = (opts[:servers] || [ opts[:server] ]).collect do |server| case server when Hash server = default_server.new(server) when String host, port = server.split(':') server = default_server.new(:host => host, :port => port) when Class server = server.new end server.strict_reads = true if opts[:strict_reads] and server.respond_to?(:strict_reads=) server end end def inspect "<Memcache: %d servers, ns: %p, ro: %p>" % [@servers.length, namespace, @readonly] >>>>>>> refactor end ## # Returns the namespace for the current thread. def namespace <<<<<<< HEAD Thread.current[:memcache_namespace] || @namespace ======= @namespace || default_namespace >>>>>>> refactor end ## # Set the namespace for the current thread. def namespace=(namespace) <<<<<<< HEAD if namespace == @namespace Thread.current[:memcache_namespace] = nil ======= if default_namespace == namespace @namespace = nil >>>>>>> refactor else @namespace = namespace end end ## # Returns a string representation of the cache object. def inspect "<MemCache: %d servers, %d buckets, ns: %p, ro: %p>" % [@servers.length, @buckets.length, namespace, @readonly] end <<<<<<< HEAD ## # Returns whether there is at least one active server for the object. def active? not @servers.empty? end ## # Returns whether or not the cache object was created read only. def readonly? @readonly end ## # Set the servers that the requests will be distributed between. Entries # can be either strings of the form "hostname:port" or # "hostname:port:weight" or MemCache::Server objects. def servers=(servers) # Create the server objects. @servers = servers.collect do |server| case server when String host, port, weight = server.split ':', 3 port ||= DEFAULT_PORT weight ||= DEFAULT_WEIGHT Server.new self, host, port, weight when Server if server.memcache.multithread != @multithread then raise ArgumentError, "can't mix threaded and non-threaded servers" end server else raise TypeError, "cannot convert #{server.class} into MemCache::Server" ======= def get(keys, opts = {}) raise 'opts must be hash' unless opts.kind_of?(Hash) if keys.kind_of?(Array) multi_get(keys, opts) else key = cache_key(keys) if opts[:expiry] value = server(key).gets(key) server(key).cas(key, value, value.memcache_cas, opts[:expiry]) if value else value = server(key).get(key, opts[:cas]) >>>>>>> refactor end unmarshal(value, opts) end <<<<<<< HEAD # Create an array of server buckets for weight selection of servers. @buckets = [] @servers.each do |server| server.weight.times { @buckets.push(server) } end end ## # Decrements the value for +key+ by +amount+ and returns the new value. # +key+ must already exist. If +key+ is not an integer, it is assumed to be # 0. +key+ can not be decremented below 0. def decr(key, amount = 1) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| cache_decr server, cache_key, amount end rescue TypeError => err handle_error nil, err end ## # Retrieves +key+ from memcache. If +raw+ is false, the value will be # unmarshalled. def get(key, raw = false) with_server(key) do |server, cache_key| value = cache_get server, cache_key return nil if value.nil? value = Marshal.load(value) unless raw return value end rescue TypeError => err handle_error nil, err end ## # Retrieves multiple values from memcached in parallel, if possible. # # The memcached protocol supports the ability to retrieve multiple # keys in a single request. Pass in an array of keys to this method # and it will: # # 1. map the key to the appropriate memcached server # 2. send a single request to each server that has one or more key values # # Returns a hash of values. # # cache["a"] = 1 # cache["b"] = 2 # cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 } def get_multi(*keys) raise MemCacheError, 'No active servers' unless active? opts = keys.last.kind_of?(Hash) ? keys.pop : {} keys.flatten! key_count = keys.length cache_keys = {} server_keys = Hash.new { |h,k| h[k] = [] } # map keys to servers keys.each do |key| server, cache_key = request_setup key cache_keys[cache_key] = key server_keys[server] << cache_key end results = {} server_keys.each do |server, keys| keys = keys.join ' ' values = cache_get_multi server, keys values.each do |key, value| results[cache_keys[key]] = opts[:raw] ? value : Marshal.load(value) ======= end def read(keys, opts = {}) get(keys, opts.merge(:raw => true)) end def set(key, value, opts = {}) raise 'opts must be hash' unless opts.kind_of?(Hash) expiry = opts[:expiry] || default_expiry key = cache_key(key) data = marshal(value, opts) server(key).set(key, data, expiry, opts[:flags]) value end def write(key, value, opts = {}) set(key, value, opts.merge(:raw => true)) end def add(key, value, opts = {}) raise 'opts must be hash' unless opts.kind_of?(Hash) expiry = opts[:expiry] || default_expiry key = cache_key(key) data = marshal(value, opts) server(key).add(key, data, expiry, opts[:flags]) && value end def replace(key, value, opts = {}) raise 'opts must be hash' unless opts.kind_of?(Hash) expiry = opts[:expiry] || default_expiry key = cache_key(key) data = marshal(value, opts) server(key).replace(key, data, expiry, opts[:flags]) && value end def cas(key, value, opts = {}) raise 'opts must be hash' unless opts.kind_of?(Hash) expiry = opts[:expiry] || default_expiry key = cache_key(key) data = marshal(value, opts) server(key).cas(key, data, opts[:cas], expiry, opts[:flags]) && value end def append(key, value) key = cache_key(key) server(key).append(key, value) end def prepend(key, value) key = cache_key(key) server(key).prepend(key, value) end def count(key) key = cache_key(key) server(key).get(key).to_i end def incr(key, amount = 1) key = cache_key(key) server(key).incr(key, amount) || begin server(key).add(key, '0') server(key).incr(key, amount) end end def decr(key, amount = 1) key = cache_key(key) server(key).decr(key, amount) || begin server(key).add(key, '0') 0 # Cannot decrement below zero. end end def update(key, opts = {}) value = get(key, :cas => true) if value cas(key, yield(value), opts.merge!(:cas => value.memcache_cas)) else add(key, yield(value), opts) end end def get_or_add(key, *args) # Pseudo-atomic get and update. if block_given? opts = args[0] || {} get(key) || add(key, yield, opts) || get(key) else opts = args[1] || {} get(key) || add(key, args[0], opts) || get(key) end end def get_or_set(key, *args) if block_given? opts = args[0] || {} get(key) || set(key, yield, opts) else opts = args[1] || {} get(key) || set(key, args[0], opts) end end def get_some(keys, opts = {}) keys = keys.collect {|key| key.to_s} records = opts[:disable] ? {} : self.get(keys, opts) if opts[:validation] records.delete_if do |key, value| not opts[:validation].call(key, value) end end keys_to_fetch = keys - records.keys method = opts[:overwrite] ? :set : :add if keys_to_fetch.any? yield(keys_to_fetch).each do |key, value| self.send(method, key, value, opts) unless opts[:disable] or opts[:disable_write] records[key] = value >>>>>>> refactor end end return results rescue TypeError => err handle_error nil, err end <<<<<<< HEAD ## # Increments the value for +key+ by +amount+ and returns the new value. # +key+ must already exist. If +key+ is not an integer, it is assumed to be # 0. def incr(key, amount = 1) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| cache_incr server, cache_key, amount end rescue TypeError => err handle_error nil, err end ## # Add +key+ to the cache with value +value+ that expires in +expiry+ # seconds. If +raw+ is true, +value+ will not be Marshalled. # # Warning: Readers should not call this method in the event of a cache miss; # see MemCache#add. def set(key, value, expiry = default_expiry, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw cache_store(:set, cache_key, value, expiry, server) end end ## # Add +key+ to the cache with value +value+ that expires in +expiry+ # seconds, but only if +key+ does not already exist in the cache. # If +raw+ is true, +value+ will not be Marshalled. # # Readers should call this method in the event of a cache miss, not # MemCache#set or MemCache#[]=. def add(key, value, expiry = default_expiry, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw cache_store(:add, cache_key, value, expiry, server) end end ## # Removes +key+ from the cache in +expiry+ seconds. def delete(key, expiry = default_expiry) raise MemCacheError, "Update of readonly cache" if @readonly server, cache_key = request_setup key with_socket_management(server) do |socket| socket.write "delete #{cache_key} #{expiry}\r\n" socket.gets end ======= def lock(key, opts = {}) # Returns false if the lock already exists. expiry = opts[:expiry] || LOCK_TIMEOUT add(lock_key(key), Socket.gethostname, :expiry => expiry, :raw => true) end def unlock(key) delete(lock_key(key)) end def with_lock(key, opts = {}) until lock(key) do return if opts[:ignore] sleep(WRITE_LOCK_WAIT) # just wait end yield unlock(key) unless opts[:keep] end def lock_key(key) "lock:#{key}" end def locked?(key) get(lock_key(key), :raw => true) end def delete(key) key = cache_key(key) server(key).delete(key) >>>>>>> refactor end ## # Flush the cache from all memcache servers. def flush_all raise MemCacheError, 'No active servers' unless active? raise MemCacheError, "Update of readonly cache" if @readonly begin @mutex.lock if @multithread @servers.each do |server| with_socket_management(server) do |socket| socket.write "flush_all\r\n" result = socket.gets raise MemCacheError, $2.strip if result =~ /^(SERVER_)?ERROR(.*)/ end end ensure @mutex.unlock if @multithread end end ## # Reset the connection to all memcache servers. This should be called if # there is a problem with a cache lookup that might have left the connection # in a corrupted state. def reset <<<<<<< HEAD @servers.each { |server| server.close } ======= servers.each {|server| server.close} >>>>>>> refactor end ## # Returns statistics for each memcached server. An explanation of the # statistics can be found in the memcached docs: # # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt # # Example: # # >> pp CACHE.stats # {"localhost:11211"=> # {"bytes"=>4718, # "pid"=>20188, # "connection_structures"=>4, # "time"=>1162278121, # "pointer_size"=>32, # "limit_maxbytes"=>67108864, # "cmd_get"=>14532, # "version"=>"1.2.0", # "bytes_written"=>432583, # "cmd_set"=>32, # "get_misses"=>0, # "total_connections"=>19, # "curr_connections"=>3, # "curr_items"=>4, # "uptime"=>1557, # "get_hits"=>14532, # "total_items"=>32, # "rusage_system"=>0.313952, # "rusage_user"=>0.119981, # "bytes_read"=>190619}} # => nil def stats raise MemCacheError, "No active servers" unless active? server_stats = {} @servers.each do |server| next unless server.alive? with_socket_management(server) do |socket| value = nil # TODO: why is this line here? socket.write "stats\r\n" stats = {} while line = socket.gets do break if line == "END\r\n" if line =~ /^STAT ([\w]+) ([\w\.\:]+)/ then name, value = $1, $2 stats[name] = case name when 'version' value when 'rusage_user', 'rusage_system' then seconds, microseconds = value.split(/:/, 2) microseconds ||= 0 Float(seconds) + (Float(microseconds) / 1_000_000) else if value =~ /^\d+$/ then value.to_i else value end end end end server_stats["#{server.host}:#{server.port}"] = stats end end server_stats end ## # Shortcut to get a value from the cache. alias [] get ## # Shortcut to save a value in the cache. This method does not set an # expiration on the entry. Use set to specify an explicit expiry. def []=(key, value) set(key, value) end protected unless $TESTING <<<<<<< HEAD ## # Create a key for the cache, incorporating the namespace qualifier if # requested. def make_cache_key(key) ======= def multi_get(keys, opts = {}) return {} if keys.empty? key_to_input_key = {} keys_by_server = Hash.new { |h,k| h[k] = [] } # Store keys by servers. Also store a mapping from cache key to input key. keys.each do |input_key| key = cache_key(input_key) server = server(key) key_to_input_key[key] = input_key.to_s keys_by_server[server] << key end # Fetch and combine the results. Also, map the cache keys back to the input keys. results = {} keys_by_server.each do |server, keys| server.get(keys, opts[:cas]).each do |key, value| input_key = key_to_input_key[key] results[input_key] = unmarshal(value, opts) end end results end def cache_key(key) >>>>>>> refactor safe_key = key ? key.to_s.gsub(/%/, '%%').gsub(/ /, '%s') : key if namespace.nil? then safe_key else "#{namespace}:#{safe_key}" end end def marshal(value, opts = {}) opts[:raw] ? value : Marshal.dump(value) end def unmarshal(value, opts = {}) return value if value.nil? or opts[:raw] object = Marshal.load(value) object.memcache_flags = value.memcache_flags object.memcache_cas = value.memcache_cas object rescue Exception => e nil end ## # Pick a server to handle the request based on a hash of the key. def get_server_for_key(key) raise ArgumentError, "illegal character in key #{key.inspect}" if key =~ /\s/ raise ArgumentError, "key too long #{key.inspect}" if key.length > 250 raise MemCacheError, "No servers available" if @servers.empty? return @servers.first if @servers.length == 1 hkey = hash_for key fallback.times do |try| server = @buckets[hkey % @buckets.nitems] return server if server.alive? hkey += hash_for "#{try}#{key}" end raise MemCacheError, "No servers available" end ## # Returns an interoperable hash value for +key+. (I think, docs are # sketchy for down servers). def hash_for(key) (Zlib.crc32(key) >> 16) & 0x7fff end ## # Performs a raw decr for +cache_key+ from +server+. Returns nil if not # found. def cache_decr(server, cache_key, amount) with_socket_management(server) do |socket| socket.write "decr #{cache_key} #{amount}\r\n" text = socket.gets return nil if text == "NOT_FOUND\r\n" return text.to_i end end ## # Fetches the raw data for +cache_key+ from +server+. Returns nil on cache # miss. def cache_get(server, cache_key) with_socket_management(server) do |socket| socket.write "get #{cache_key}\r\n" keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" if keyline.nil? then server.close raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too end return nil if keyline == "END\r\n" unless keyline =~ /(\d+)\r/ then server.close raise MemCacheError, "unexpected response #{keyline.inspect}" end value = socket.read $1.to_i socket.read 2 # "\r\n" socket.gets # "END\r\n" return value end end ## # Fetches +cache_keys+ from +server+ using a multi-get. def cache_get_multi(server, cache_keys) with_socket_management(server) do |socket| values = {} socket.write "get #{cache_keys}\r\n" while keyline = socket.gets do return values if keyline == "END\r\n" unless keyline =~ /^VALUE (.+) (.+) (.+)/ then server.close raise MemCacheError, "unexpected response #{keyline.inspect}" end key, data_length = $1, $3 values[$1] = socket.read data_length.to_i socket.read(2) # "\r\n" end server.close raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too end end ## # Stores +value+ to +cache_keys+ on +server+ using +method+ (must be :set or :add). def cache_store(method, cache_key, value, expiry, server) raise MemCacheError, "unknown store method #{method}" unless [:set, :add].include?(method) command = "#{method} #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command result = socket.gets if result.nil? server.close raise MemCacheError, "lost connection to #{server.host}:#{server.port}" end if result =~ /^SERVER_ERROR (.*)/ server.close raise MemCacheError, "%s:\n%s" % [$1.strip, Marshal.restore(value).inspect] end result end end ## # Performs a raw incr for +cache_key+ from +server+. Returns nil if not # found. def cache_incr(server, cache_key, amount) with_socket_management(server) do |socket| socket.write "incr #{cache_key} #{amount}\r\n" text = socket.gets return nil if text == "NOT_FOUND\r\n" return text.to_i end end ## # Gets or creates a socket connected to the given server, and yields it # to the block. If a socket error (SocketError, SystemCallError, IOError) # or protocol error (MemCacheError) is raised by the block, closes the # socket, attempts to connect again, and retries the block (once). If # an error is again raised, reraises it as MemCacheError. # If unable to connect to the server (or if in the reconnect wait period), # raises MemCacheError - note that the socket connect code marks a server # dead for a timeout period, so retrying does not apply to connection attempt # failures (but does still apply to unexpectedly lost connections etc.). # Wraps the whole lot in mutex synchronization if @multithread is true. def with_socket_management(server, &block) @mutex.lock if @multithread retried = false begin socket = server.socket # Raise an IndexError to show this server is out of whack. # We'll catch it in higher-level code and attempt to restart the operation. raise IndexError, "No connection to server (#{server.status})" if socket.nil? block.call(socket) rescue MemCacheError, SocketError, SystemCallError, IOError => err handle_error(server, err) if retried || socket.nil? retried = true retry rescue Exception => err server.close raise err end ensure @mutex.unlock if @multithread end def with_server(key) retried = false begin server, cache_key = request_setup(key) yield server, cache_key rescue IndexError => e if !retried && @servers.size > 1 puts "Connection to server #{server.inspect} DIED! Retrying operation..." retried = true retry end handle_error(nil, e) end end ## # Handles +error+ from +server+. def handle_error(server, error) raise error if error.is_a?(MemCacheError) server.close if server new_error = MemCacheError.new error.message new_error.set_backtrace error.backtrace raise new_error end ## # Performs setup for making a request with +key+ from memcached. Returns # the server to fetch the key from and the complete key to use. def request_setup(key) raise MemCacheError, 'No active servers' unless active? cache_key = make_cache_key key server = get_server_for_key cache_key return server, cache_key end ## # This class represents a memcached server instance. class Server ## # The amount of time to wait to establish a connection with a memcached # server. If a connection cannot be established within this time limit, # the server will be marked as down. CONNECT_TIMEOUT = 1.0 unless defined? CONNECT_TIMEOUT ## # The amount of time to wait before attempting to re-establish a # connection with a server that is marked dead. RETRY_DELAY = 1.0 unless defined? RETRY_DELAY ## # The host the memcached server is running on. attr_reader :host ## # The port the memcached server is listening on. attr_reader :port ## # The weight given to the server. attr_reader :weight ## # The time of next retry if the connection is dead. attr_reader :retry ## # A text status string describing the state of the server. attr_reader :status ## # Create a new MemCache::Server object for the memcached instance # listening on the given host and port, weighted by the given weight. def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT) raise ArgumentError, "No host specified" if host.nil? or host.empty? raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero? @memcache = memcache @host = host @port = port.to_i @weight = weight.to_i @multithread = @memcache.multithread @mutex = Mutex.new @sock = nil @retry = nil @status = 'NOT CONNECTED' end ## # Return a string representation of the server object. def inspect "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status] end ## # Check whether the server connection is alive. This will cause the # socket to attempt to connect if it isn't already connected and or if # the server was previously marked as down and the retry time has # been exceeded. def alive? !!socket end ## # Try to connect to the memcached server targeted by this object. # Returns the connected socket object on success or nil on failure. def socket @mutex.lock if @multithread return @sock if @sock and not @sock.closed? @sock = nil # If the host was dead, don't retry for a while. return if @retry and @retry > Time.now # Attempt to connect if not already connected. begin @sock = timeout CONNECT_TIMEOUT do TCPSocket.new @host, @port end if Socket.constants.include? 'TCP_NODELAY' then @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 end @retry = nil @status = 'CONNECTED' rescue SocketError, SystemCallError, IOError, Timeout::Error => err mark_dead err.message end return @sock ensure @mutex.unlock if @multithread end ## # Close the connection to the memcached server targeted by this # object. The server is not considered dead. def close @mutex.lock if @multithread @sock.close if @sock && !@sock.closed? @sock = nil @retry = nil @status = "NOT CONNECTED" ensure @mutex.unlock if @multithread end private ## # Mark the server as dead and close its socket. def mark_dead(reason = "Unknown error") @sock.close if @sock && !@sock.closed? @sock = nil @retry = Time.now + RETRY_DELAY @status = sprintf "DEAD: %s, will retry at %s", reason, @retry end end ## # Base MemCache exception class. class MemCacheError < RuntimeError; end class CachePool attr_reader :fallback def initialize @cache_by_scope = {} @cache_by_scope[:default] = Memcache.new(:server => Memcache::LocalServer) @fallback = :default end def include?(scope) @cache_by_scope.include?(scope.to_sym) end def fallback=(scope) @fallback = scope.to_sym end def [](scope) @cache_by_scope[scope.to_sym] || @cache_by_scope[fallback] end def []=(scope, cache) @cache_by_scope[scope.to_sym] = cache end def reset @cache_by_scope.values.each {|c| c.reset} end end def self.pool @@cache_pool ||= CachePool.new end end <<<<<<< HEAD ======= # Add flags and cas class Object attr_accessor :memcache_flags, :memcache_cas end >>>>>>> refactor