lib/dalli/client.rb in dalli-2.5.0 vs lib/dalli/client.rb in dalli-2.6.0
- old
+ new
@@ -20,17 +20,18 @@
# - :namespace - prepend each key with this value to provide simple namespacing.
# - :failover - if a server is down, look for and store values on another server in the ring. Default: true.
# - :threadsafe - ensure that only one thread is actively using a socket at a time. Default: true.
# - :expires_in - default TTL in seconds if you do not pass TTL as a parameter to an individual operation, defaults to 0 or forever
# - :compress - defaults to false, if true Dalli will compress values larger than 1024 bytes before
+ # - :serializer - defaults to Marshal
# sending them to memcached.
+ # - :compressor - defaults to zlib
#
def initialize(servers=nil, options={})
@servers = servers || env_servers || '127.0.0.1:11211'
@options = normalize_options(options)
@ring = nil
- @servers_in_use = nil
end
#
# The standard memcached instruction set
#
@@ -58,36 +59,83 @@
def get_multi(*keys)
return {} if keys.empty?
options = nil
options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil?
ring.lock do
- self.servers_in_use = Set.new
+ begin
+ servers = self.servers_in_use = Set.new
- keys.flatten.each do |key|
- begin
- perform(:getkq, key)
- rescue DalliError, NetworkError => e
- Dalli.logger.debug { e.inspect }
- Dalli.logger.debug { "unable to get key #{key}" }
+ keys.flatten.each do |key|
+ begin
+ perform(:getkq, key)
+ rescue DalliError, NetworkError => e
+ Dalli.logger.debug { e.inspect }
+ Dalli.logger.debug { "unable to get key #{key}" }
+ end
end
- end
- values = {}
- servers_in_use.each do |server|
- next unless server.alive?
- begin
- server.request(:noop).each_pair do |key, value|
- values[key_without_namespace(key)] = value
+ values = {}
+ return values if servers.empty?
+
+ servers.each do |server|
+ next unless server.alive?
+ begin
+ server.multi_response_start
+ rescue DalliError, NetworkError => e
+ Dalli.logger.debug { e.inspect }
+ Dalli.logger.debug { "results from this server will be missing" }
+ servers.delete(server)
end
- rescue DalliError, NetworkError => e
- Dalli.logger.debug { e.inspect }
- Dalli.logger.debug { "results from this server will be missing" }
end
+
+ start = Time.now
+ loop do
+ # remove any dead servers
+ servers.delete_if { |s| s.sock.nil? }
+ break if servers.empty?
+
+ # calculate remaining timeout
+ elapsed = Time.now - start
+ timeout = servers.first.options[:socket_timeout]
+ if elapsed > timeout
+ readable = nil
+ else
+ sockets = servers.map(&:sock)
+ readable, _ = IO.select(sockets, nil, nil, timeout - elapsed)
+ end
+
+ if readable.nil?
+ # no response within timeout; abort pending connections
+ servers.each do |server|
+ puts "Abort!"
+ server.multi_response_abort
+ end
+ break
+
+ else
+ readable.each do |sock|
+ server = sock.server
+
+ begin
+ server.multi_response_nonblock.each do |key, value|
+ values[key_without_namespace(key)] = value
+ end
+
+ if server.multi_response_completed?
+ servers.delete(server)
+ end
+ rescue NetworkError
+ servers.delete(server)
+ end
+ end
+ end
+ end
+
+ values
+ ensure
+ self.servers_in_use = nil
end
- values
end
- ensure
- self.servers_in_use = nil
end
def fetch(key, ttl=nil, options=nil)
ttl ||= @options[:expires_in].to_i
val = get(key, options)