lib/memcache.rb in memcache-client-1.7.0 vs lib/memcache.rb in memcache-client-1.7.1

- old
+ new

@@ -31,11 +31,11 @@ class MemCache ## # The version of MemCache you are using. - VERSION = '1.7.0' + VERSION = '1.7.1' ## # Default options for the cache object. DEFAULT_OPTIONS = { @@ -221,11 +221,11 @@ # Retrieves +key+ from memcache. If +raw+ is false, the value will be # unmarshalled. def get(key, raw = false) with_server(key) do |server, cache_key| - logger.debug { "get #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger + logger.debug { "get #{key} from #{server.inspect}" } if logger value = cache_get server, cache_key return nil if value.nil? value = Marshal.load value unless raw return value end @@ -331,16 +331,15 @@ def set(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw - data = value.to_s - logger.debug { "set #{key} to #{server.inspect}: #{data.size}" } if logger + logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger - raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB + raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if value.to_s.size > ONE_MB - command = "set #{cache_key} 0 #{expiry} #{data.size}#{noreply}\r\n#{data}\r\n" + command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command break nil if @no_reply result = socket.gets @@ -378,15 +377,14 @@ (value, token) = gets(key, raw) return nil unless value updated = yield value with_server(key) do |server, cache_key| - logger.debug { "cas #{key} to #{server.inspect}: #{data.size}" } if logger value = Marshal.dump updated unless raw - data = value.to_s - command = "cas #{cache_key} 0 #{expiry} #{value.size} #{token}#{noreply}\r\n#{value}\r\n" + logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger + command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command break nil if @no_reply result = socket.gets @@ -718,11 +716,11 @@ end end def gets(key, raw = false) with_server(key) do |server, cache_key| - logger.debug { "gets #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger + logger.debug { "gets #{key} from #{server.inspect}" } if logger result = with_socket_management(server) do |socket| socket.write "gets #{cache_key}\r\n" keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n" if keyline.nil? then @@ -820,11 +818,11 @@ raise IndexError, "No connection to server (#{server.status})" if socket.nil? block.call(socket) - rescue SocketError, Timeout::Error => err + rescue SocketError, Errno::EAGAIN, Timeout::Error => err logger.warn { "Socket failure: #{err.message}" } if logger server.mark_dead(err) handle_error(server, err) rescue MemCacheError, SystemCallError, IOError => err @@ -920,17 +918,10 @@ # This class represents a memcached server instance. class Server ## - # The amount of time to wait to establish a connection with a memcached - # server. If a connection cannot be established within this time limit, - # the server will be marked as down. - - CONNECT_TIMEOUT = 0.25 - - ## # The amount of time to wait before attempting to re-establish a # connection with a server that is marked dead. RETRY_DELAY = 30.0 @@ -1009,25 +1000,49 @@ # If the host was dead, don't retry for a while. return if @retry and @retry > Time.now # Attempt to connect if not already connected. begin - @sock = @timeout ? TCPTimeoutSocket.new(@host, @port, @timeout) : TCPSocket.new(@host, @port) - - if Socket.constants.include? 'TCP_NODELAY' then - @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 - end + @sock = connect_to(@host, @port, @timeout) + @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 @retry = nil @status = 'CONNECTED' - rescue SocketError, SystemCallError, IOError, Timeout::Error => err + rescue SocketError, SystemCallError, IOError => err logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger mark_dead err end return @sock end + def connect_to(host, port, timeout=nil) + addr = Socket.getaddrinfo(host, nil) + sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) + + if timeout + secs = Integer(timeout) + usecs = Integer((timeout - secs) * 1_000_000) + optval = [secs, usecs].pack("l_2") + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval + + # Socket timeouts don't work for more complex IO operations + # like gets which lay on top of read. We need to fall back to + # the standard Timeout mechanism. + sock.instance_eval <<-EOR + alias :blocking_gets :gets + def gets + MemCacheTimer.timeout(#{timeout}) do + self.blocking_gets + end + end + EOR + end + sock.connect(Socket.pack_sockaddr_in(port, addr[0][3])) + sock + end + ## # Close the connection to the memcached server targeted by this # object. The server is not considered dead. def close @@ -1057,55 +1072,10 @@ class MemCacheError < RuntimeError; end end -# TCPSocket facade class which implements timeouts. -class TCPTimeoutSocket - - def initialize(host, port, timeout) - MemCacheTimer.timeout(MemCache::Server::CONNECT_TIMEOUT) do - @sock = TCPSocket.new(host, port) - @len = timeout - end - end - - def write(*args) - MemCacheTimer.timeout(@len) do - @sock.write(*args) - end - end - - def gets(*args) - MemCacheTimer.timeout(@len) do - @sock.gets(*args) - end - end - - def read(*args) - MemCacheTimer.timeout(@len) do - @sock.read(*args) - end - end - - def _socket - @sock - end - - def method_missing(meth, *args) - @sock.__send__(meth, *args) - end - - def closed? - @sock.closed? - end - - def close - @sock.close - end -end - module Continuum POINTS_PER_SERVER = 160 # this is the default in libmemcached # Find the closest index in Continuum with value <= the given value def self.binary_search(ary, value, &block) @@ -1140,5 +1110,6 @@ def inspect "<#{value}, #{server.host}:#{server.port}>" end end end +require 'continuum_native'