lib/dalli/client.rb in dalli-2.6.3 vs lib/dalli/client.rb in dalli-2.6.4
- old
+ new
@@ -65,86 +65,102 @@
##
# Fetch multiple keys efficiently.
# Returns a hash of { 'key' => 'value', 'key2' => 'value1' }
def get_multi(*keys)
- return {} if keys.empty?
- options = nil
- options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil?
- ring.lock do
- begin
- servers = self.servers_in_use = Set.new
-
- keys.flatten.each do |key|
- begin
- perform(:getkq, key)
- rescue DalliError, NetworkError => e
- Dalli.logger.debug { e.inspect }
- Dalli.logger.debug { "unable to get key #{key}" }
+ perform do
+ return {} if keys.empty?
+ options = nil
+ options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil?
+ ring.lock do
+ begin
+ mapped_keys = keys.flatten.map {|a| validate_key(a.to_s)}
+ groups = mapped_keys.flatten.group_by do |key|
+ begin
+ ring.server_for_key(key)
+ rescue Dalli::RingError
+ Dalli.logger.debug { "unable to get key #{key}" }
+ nil
+ end
end
- end
+ if unfound_keys = groups.delete(nil)
+ Dalli.logger.debug { "unable to get keys for #{unfound_keys.length} keys because no matching server was found" }
+ end
- values = {}
- return values if servers.empty?
-
- servers.each do |server|
- next unless server.alive?
- begin
- server.multi_response_start
- rescue DalliError, NetworkError => e
- Dalli.logger.debug { e.inspect }
- Dalli.logger.debug { "results from this server will be missing" }
- servers.delete(server)
+ groups.each do |server, keys_for_server|
+ begin
+ # TODO: do this with the perform chokepoint?
+ # But given the fact that fetching the response doesn't take place
+ # in that slot it's misleading anyway. Need to move all of this method
+ # into perform to be meaningful
+ server.request(:send_multiget, keys_for_server)
+ rescue DalliError, NetworkError => e
+ Dalli.logger.debug { e.inspect }
+ Dalli.logger.debug { "unable to get keys for server #{server.hostname}:#{server.port}" }
+ end
end
- end
- start = Time.now
- loop do
- # remove any dead servers
- servers.delete_if { |s| s.sock.nil? }
- break if servers.empty?
+ servers = groups.keys
+ values = {}
+ return values if servers.empty?
- # calculate remaining timeout
- elapsed = Time.now - start
- timeout = servers.first.options[:socket_timeout]
- if elapsed > timeout
- readable = nil
- else
- sockets = servers.map(&:sock)
- readable, _ = IO.select(sockets, nil, nil, timeout - elapsed)
+ servers.each do |server|
+ next unless server.alive?
+ begin
+ server.multi_response_start
+ rescue DalliError, NetworkError => e
+ Dalli.logger.debug { e.inspect }
+ Dalli.logger.debug { "results from this server will be missing" }
+ servers.delete(server)
+ end
end
- if readable.nil?
- # no response within timeout; abort pending connections
- servers.each do |server|
- puts "Abort!"
- server.multi_response_abort
+ start = Time.now
+ loop do
+ # remove any dead servers
+ servers.delete_if { |s| s.sock.nil? }
+ break if servers.empty?
+
+ # calculate remaining timeout
+ elapsed = Time.now - start
+ timeout = servers.first.options[:socket_timeout]
+ if elapsed > timeout
+ readable = nil
+ else
+ sockets = servers.map(&:sock)
+ readable, _ = IO.select(sockets, nil, nil, timeout - elapsed)
end
- break
- else
- readable.each do |sock|
- server = sock.server
+ if readable.nil?
+ # no response within timeout; abort pending connections
+ servers.each do |server|
+ Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" }
+ server.multi_response_abort
+ end
+ break
- begin
- server.multi_response_nonblock.each do |key, value|
- values[key_without_namespace(key)] = value
- end
+ else
+ readable.each do |sock|
+ server = sock.server
- if server.multi_response_completed?
+ begin
+ server.multi_response_nonblock.each do |key, value|
+ values[key_without_namespace(key)] = value
+ end
+
+ if server.multi_response_completed?
+ servers.delete(server)
+ end
+ rescue NetworkError
servers.delete(server)
end
- rescue NetworkError
- servers.delete(server)
end
end
end
- end
- values
- ensure
- self.servers_in_use = nil
+ values
+ end
end
end
end
def fetch(key, ttl=nil, options=nil)
@@ -319,30 +335,24 @@
end, @options
)
end
# Chokepoint method for instrumentation
- def perform(op, key, *args)
+ def perform(*all_args, &blk)
+ return blk.call if blk
+ op, key, *args = *all_args
+
key = key.to_s
key = validate_key(key)
begin
server = ring.server_for_key(key)
ret = server.request(op, key, *args)
- servers_in_use << server if servers_in_use
ret
rescue NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "retrying request with new server" }
retry
end
- end
-
- def servers_in_use
- Thread.current[:"#{object_id}-servers"]
- end
-
- def servers_in_use=(value)
- Thread.current[:"#{object_id}-servers"] = value
end
def validate_key(key)
raise ArgumentError, "key cannot be blank" if !key || key.length == 0
key = key_with_namespace(key)