lib/memcache.rb in memcache-client-1.7.0 vs lib/memcache.rb in memcache-client-1.7.1
- old
+ new
@@ -31,11 +31,11 @@
class MemCache
##
# The version of MemCache you are using.
- VERSION = '1.7.0'
+ VERSION = '1.7.1'
##
# Default options for the cache object.
DEFAULT_OPTIONS = {
@@ -221,11 +221,11 @@
# Retrieves +key+ from memcache. If +raw+ is false, the value will be
# unmarshalled.
def get(key, raw = false)
with_server(key) do |server, cache_key|
- logger.debug { "get #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
+ logger.debug { "get #{key} from #{server.inspect}" } if logger
value = cache_get server, cache_key
return nil if value.nil?
value = Marshal.load value unless raw
return value
end
@@ -331,16 +331,15 @@
def set(key, value, expiry = 0, raw = false)
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
value = Marshal.dump value unless raw
- data = value.to_s
- logger.debug { "set #{key} to #{server.inspect}: #{data.size}" } if logger
+ logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
- raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB
+ raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if value.to_s.size > ONE_MB
- command = "set #{cache_key} 0 #{expiry} #{data.size}#{noreply}\r\n#{data}\r\n"
+ command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
with_socket_management(server) do |socket|
socket.write command
break nil if @no_reply
result = socket.gets
@@ -378,15 +377,14 @@
(value, token) = gets(key, raw)
return nil unless value
updated = yield value
with_server(key) do |server, cache_key|
- logger.debug { "cas #{key} to #{server.inspect}: #{data.size}" } if logger
value = Marshal.dump updated unless raw
- data = value.to_s
- command = "cas #{cache_key} 0 #{expiry} #{value.size} #{token}#{noreply}\r\n#{value}\r\n"
+ logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
+ command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n"
with_socket_management(server) do |socket|
socket.write command
break nil if @no_reply
result = socket.gets
@@ -718,11 +716,11 @@
end
end
def gets(key, raw = false)
with_server(key) do |server, cache_key|
- logger.debug { "gets #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
+ logger.debug { "gets #{key} from #{server.inspect}" } if logger
result = with_socket_management(server) do |socket|
socket.write "gets #{cache_key}\r\n"
keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n"
if keyline.nil? then
@@ -820,11 +818,11 @@
raise IndexError, "No connection to server (#{server.status})" if socket.nil?
block.call(socket)
- rescue SocketError, Timeout::Error => err
+ rescue SocketError, Errno::EAGAIN, Timeout::Error => err
logger.warn { "Socket failure: #{err.message}" } if logger
server.mark_dead(err)
handle_error(server, err)
rescue MemCacheError, SystemCallError, IOError => err
@@ -920,17 +918,10 @@
# This class represents a memcached server instance.
class Server
##
- # The amount of time to wait to establish a connection with a memcached
- # server. If a connection cannot be established within this time limit,
- # the server will be marked as down.
-
- CONNECT_TIMEOUT = 0.25
-
- ##
# The amount of time to wait before attempting to re-establish a
# connection with a server that is marked dead.
RETRY_DELAY = 30.0
@@ -1009,25 +1000,49 @@
# If the host was dead, don't retry for a while.
return if @retry and @retry > Time.now
# Attempt to connect if not already connected.
begin
- @sock = @timeout ? TCPTimeoutSocket.new(@host, @port, @timeout) : TCPSocket.new(@host, @port)
-
- if Socket.constants.include? 'TCP_NODELAY' then
- @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
- end
+ @sock = connect_to(@host, @port, @timeout)
+ @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
@retry = nil
@status = 'CONNECTED'
- rescue SocketError, SystemCallError, IOError, Timeout::Error => err
+ rescue SocketError, SystemCallError, IOError => err
logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
mark_dead err
end
return @sock
end
+ def connect_to(host, port, timeout=nil)
+ addr = Socket.getaddrinfo(host, nil)
+ sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
+
+ if timeout
+ secs = Integer(timeout)
+ usecs = Integer((timeout - secs) * 1_000_000)
+ optval = [secs, usecs].pack("l_2")
+ sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
+ sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
+
+ # Socket timeouts don't work for more complex IO operations
+ # like gets which lay on top of read. We need to fall back to
+ # the standard Timeout mechanism.
+ sock.instance_eval <<-EOR
+ alias :blocking_gets :gets
+ def gets
+ MemCacheTimer.timeout(#{timeout}) do
+ self.blocking_gets
+ end
+ end
+ EOR
+ end
+ sock.connect(Socket.pack_sockaddr_in(port, addr[0][3]))
+ sock
+ end
+
##
# Close the connection to the memcached server targeted by this
# object. The server is not considered dead.
def close
@@ -1057,55 +1072,10 @@
class MemCacheError < RuntimeError; end
end
-# TCPSocket facade class which implements timeouts.
-class TCPTimeoutSocket
-
- def initialize(host, port, timeout)
- MemCacheTimer.timeout(MemCache::Server::CONNECT_TIMEOUT) do
- @sock = TCPSocket.new(host, port)
- @len = timeout
- end
- end
-
- def write(*args)
- MemCacheTimer.timeout(@len) do
- @sock.write(*args)
- end
- end
-
- def gets(*args)
- MemCacheTimer.timeout(@len) do
- @sock.gets(*args)
- end
- end
-
- def read(*args)
- MemCacheTimer.timeout(@len) do
- @sock.read(*args)
- end
- end
-
- def _socket
- @sock
- end
-
- def method_missing(meth, *args)
- @sock.__send__(meth, *args)
- end
-
- def closed?
- @sock.closed?
- end
-
- def close
- @sock.close
- end
-end
-
module Continuum
POINTS_PER_SERVER = 160 # this is the default in libmemcached
# Find the closest index in Continuum with value <= the given value
def self.binary_search(ary, value, &block)
@@ -1140,5 +1110,6 @@
def inspect
"<#{value}, #{server.host}:#{server.port}>"
end
end
end
+require 'continuum_native'