lib/dalli/client.rb in dalli-2.5.0 vs lib/dalli/client.rb in dalli-2.6.0

- old
+ new

@@ -20,17 +20,18 @@ # - :namespace - prepend each key with this value to provide simple namespacing. # - :failover - if a server is down, look for and store values on another server in the ring. Default: true. # - :threadsafe - ensure that only one thread is actively using a socket at a time. Default: true. # - :expires_in - default TTL in seconds if you do not pass TTL as a parameter to an individual operation, defaults to 0 or forever # - :compress - defaults to false, if true Dalli will compress values larger than 1024 bytes before + # - :serializer - defaults to Marshal # sending them to memcached. + # - :compressor - defaults to zlib # def initialize(servers=nil, options={}) @servers = servers || env_servers || '127.0.0.1:11211' @options = normalize_options(options) @ring = nil - @servers_in_use = nil end # # The standard memcached instruction set # @@ -58,36 +59,83 @@ def get_multi(*keys) return {} if keys.empty? options = nil options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil? ring.lock do - self.servers_in_use = Set.new + begin + servers = self.servers_in_use = Set.new - keys.flatten.each do |key| - begin - perform(:getkq, key) - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "unable to get key #{key}" } + keys.flatten.each do |key| + begin + perform(:getkq, key) + rescue DalliError, NetworkError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get key #{key}" } + end end - end - values = {} - servers_in_use.each do |server| - next unless server.alive? - begin - server.request(:noop).each_pair do |key, value| - values[key_without_namespace(key)] = value + values = {} + return values if servers.empty? + + servers.each do |server| + next unless server.alive? + begin + server.multi_response_start + rescue DalliError, NetworkError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "results from this server will be missing" } + servers.delete(server) end - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "results from this server will be missing" } end + + start = Time.now + loop do + # remove any dead servers + servers.delete_if { |s| s.sock.nil? } + break if servers.empty? + + # calculate remaining timeout + elapsed = Time.now - start + timeout = servers.first.options[:socket_timeout] + if elapsed > timeout + readable = nil + else + sockets = servers.map(&:sock) + readable, _ = IO.select(sockets, nil, nil, timeout - elapsed) + end + + if readable.nil? + # no response within timeout; abort pending connections + servers.each do |server| + puts "Abort!" + server.multi_response_abort + end + break + + else + readable.each do |sock| + server = sock.server + + begin + server.multi_response_nonblock.each do |key, value| + values[key_without_namespace(key)] = value + end + + if server.multi_response_completed? + servers.delete(server) + end + rescue NetworkError + servers.delete(server) + end + end + end + end + + values + ensure + self.servers_in_use = nil end - values end - ensure - self.servers_in_use = nil end def fetch(key, ttl=nil, options=nil) ttl ||= @options[:expires_in].to_i val = get(key, options)