lib/dalli/client.rb in dalli-2.6.3 vs lib/dalli/client.rb in dalli-2.6.4

- old
+ new

@@ -65,86 +65,102 @@ ## # Fetch multiple keys efficiently. # Returns a hash of { 'key' => 'value', 'key2' => 'value1' } def get_multi(*keys) - return {} if keys.empty? - options = nil - options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil? - ring.lock do - begin - servers = self.servers_in_use = Set.new - - keys.flatten.each do |key| - begin - perform(:getkq, key) - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "unable to get key #{key}" } + perform do + return {} if keys.empty? + options = nil + options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil? + ring.lock do + begin + mapped_keys = keys.flatten.map {|a| validate_key(a.to_s)} + groups = mapped_keys.flatten.group_by do |key| + begin + ring.server_for_key(key) + rescue Dalli::RingError + Dalli.logger.debug { "unable to get key #{key}" } + nil + end end - end + if unfound_keys = groups.delete(nil) + Dalli.logger.debug { "unable to get keys for #{unfound_keys.length} keys because no matching server was found" } + end - values = {} - return values if servers.empty? - - servers.each do |server| - next unless server.alive? - begin - server.multi_response_start - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "results from this server will be missing" } - servers.delete(server) + groups.each do |server, keys_for_server| + begin + # TODO: do this with the perform chokepoint? + # But given the fact that fetching the response doesn't take place + # in that slot it's misleading anyway. Need to move all of this method + # into perform to be meaningful + server.request(:send_multiget, keys_for_server) + rescue DalliError, NetworkError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get keys for server #{server.hostname}:#{server.port}" } + end end - end - start = Time.now - loop do - # remove any dead servers - servers.delete_if { |s| s.sock.nil? } - break if servers.empty? + servers = groups.keys + values = {} + return values if servers.empty? - # calculate remaining timeout - elapsed = Time.now - start - timeout = servers.first.options[:socket_timeout] - if elapsed > timeout - readable = nil - else - sockets = servers.map(&:sock) - readable, _ = IO.select(sockets, nil, nil, timeout - elapsed) + servers.each do |server| + next unless server.alive? + begin + server.multi_response_start + rescue DalliError, NetworkError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "results from this server will be missing" } + servers.delete(server) + end end - if readable.nil? - # no response within timeout; abort pending connections - servers.each do |server| - puts "Abort!" - server.multi_response_abort + start = Time.now + loop do + # remove any dead servers + servers.delete_if { |s| s.sock.nil? } + break if servers.empty? + + # calculate remaining timeout + elapsed = Time.now - start + timeout = servers.first.options[:socket_timeout] + if elapsed > timeout + readable = nil + else + sockets = servers.map(&:sock) + readable, _ = IO.select(sockets, nil, nil, timeout - elapsed) end - break - else - readable.each do |sock| - server = sock.server + if readable.nil? + # no response within timeout; abort pending connections + servers.each do |server| + Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" } + server.multi_response_abort + end + break - begin - server.multi_response_nonblock.each do |key, value| - values[key_without_namespace(key)] = value - end + else + readable.each do |sock| + server = sock.server - if server.multi_response_completed? + begin + server.multi_response_nonblock.each do |key, value| + values[key_without_namespace(key)] = value + end + + if server.multi_response_completed? + servers.delete(server) + end + rescue NetworkError servers.delete(server) end - rescue NetworkError - servers.delete(server) end end end - end - values - ensure - self.servers_in_use = nil + values + end end end end def fetch(key, ttl=nil, options=nil) @@ -319,30 +335,24 @@ end, @options ) end # Chokepoint method for instrumentation - def perform(op, key, *args) + def perform(*all_args, &blk) + return blk.call if blk + op, key, *args = *all_args + key = key.to_s key = validate_key(key) begin server = ring.server_for_key(key) ret = server.request(op, key, *args) - servers_in_use << server if servers_in_use ret rescue NetworkError => e Dalli.logger.debug { e.inspect } Dalli.logger.debug { "retrying request with new server" } retry end - end - - def servers_in_use - Thread.current[:"#{object_id}-servers"] - end - - def servers_in_use=(value) - Thread.current[:"#{object_id}-servers"] = value end def validate_key(key) raise ArgumentError, "key cannot be blank" if !key || key.length == 0 key = key_with_namespace(key)