lib/kestrel/client.rb in kestrel-client-0.6.1 vs lib/kestrel/client.rb in kestrel-client-0.6.4

- old
+ new

@@ -1,9 +1,10 @@ +require 'forwardable' + module Kestrel - class Client < Memcached + class Client require 'kestrel/client/stats_helper' - require 'kestrel/client/retry_helper' autoload :Proxy, 'kestrel/client/proxy' autoload :Envelope, 'kestrel/client/envelope' autoload :Blocking, 'kestrel/client/blocking' autoload :Partitioning, 'kestrel/client/partitioning' @@ -33,84 +34,69 @@ Memcached::MemoryAllocationFailure, Memcached::ReadFailure, Memcached::ServerError, Memcached::SystemError, Memcached::UnknownReadFailure, - Memcached::WriteFailure + Memcached::WriteFailure, + Memcached::NotFound ] + extend Forwardable include StatsHelper - include RetryHelper + attr_accessor :servers, :options + attr_reader :current_queue, :kestrel_options, :current_server + def_delegators :@write_client, :add, :append, :cas, :decr, :incr, :get_orig, :prepend + def initialize(*servers) opts = servers.last.is_a?(Hash) ? servers.pop : {} opts = DEFAULT_OPTIONS.merge(opts) @kestrel_options = extract_kestrel_options!(opts) @default_get_timeout = kestrel_options[:get_timeout_ms] @gets_per_server = kestrel_options[:gets_per_server] @exception_retry_limit = kestrel_options[:exception_retry_limit] @counter = 0 + @shuffle = true # we handle our own retries so that we can apply different # policies to sets and gets, so set memcached limit to 0 opts[:exception_retry_limit] = 0 opts[:distribution] = :random # force random distribution - super Array(servers).flatten.compact, opts - end + self.servers = Array(servers).flatten.compact + self.options = opts - - attr_reader :current_queue, :kestrel_options - - - # Memcached overrides - - %w(add append cas decr incr get_orig prepend).each do |m| - undef_method m + @server_count = self.servers.size # Minor optimization. + @read_client = Memcached.new(self.servers[rand(@server_count)], opts) + @write_client = Memcached.new(self.servers[rand(@server_count)], opts) end - alias _super_get_from_random get - private :_super_get_from_random - - def get_from_random(key, raw=false) - _super_get_from_random key, !raw - rescue Memcached::NotFound - end - - # use get_from_last if available, otherwise redefine to point to - # plain old get - if method_defined? :get_from_last - - def get_from_last(key, raw=false) - super key, !raw - rescue Memcached::NotFound - end - - else - - $stderr.puts "You have an older version of memcached.gem. Please upgrade to 0.19.6 or later for sticky get behavior." - def get_from_last(key, raw=false) - _super_get_from_random key, !raw - rescue Memcached::NotFound - end - - end # end ifdef :) - def delete(key, expiry=0) - with_retries { super key } + with_retries { @write_client.delete key } rescue Memcached::NotFound, Memcached::ServerEnd end def set(key, value, ttl=0, raw=false) - with_retries { super key, value, ttl, !raw } + with_retries { @write_client.set key, value, ttl, !raw } true rescue Memcached::NotStored false end + # This provides the necessary semantic to support transactionality + # in the Transactional client. It temporarily disables server + # shuffling to allow the client to close any open transactions on + # the current server before jumping. + # + def get_from_last(*args) + @shuffle = false + get *args + ensure + @shuffle = true + end # ==== Parameters # key<String>:: Queue name # opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash # allows collision-avoiding options support. @@ -123,16 +109,17 @@ # :timeout<Integer>:: Milliseconds to block for a new item # :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old # style" second argument. # def get(key, opts = {}) - raw = opts.delete(:raw) || false + raw = opts[:raw] || false commands = extract_queue_commands(opts) val = begin - send(select_get_method(key), key + commands, raw) + shuffle_if_necessary! key + @read_client.get key + commands, !raw rescue *RECOVERABLE_ERRORS # we can't tell the difference between a server being down # and an empty queue, so just return nil. our sticky server # logic should eliminate piling on down servers nil @@ -165,18 +152,22 @@ end opts.replace(memcache_opts) kestrel_opts end - def select_get_method(key) - if key != @current_queue || @counter >= @gets_per_server + def shuffle_if_necessary!(key) + # Don't reset servers on the first request: + # i.e. @counter == 0 && @current_queue == nil + if @shuffle && + (@counter > 0 && key != @current_queue) || + @counter >= @gets_per_server @counter = 0 @current_queue = key - :get_from_random + @read_client.quit + @read_client.set_servers(servers[rand(@server_count)]) else @counter +=1 - :get_from_last end end def extract_queue_commands(opts) commands = [:open, :close, :abort, :peek].select do |key| @@ -187,7 +178,22 @@ commands << "t=#{timeout}" end commands.map { |c| "/#{c}" }.join('') end + + def with_retries #:nodoc: + yield + rescue *RECOVERABLE_ERRORS + tries ||= @exception_retry_limit + 1 + tries -= 1 + if tries > 0 + retry + else + @write_client.quit + @write_client.set_servers(servers[rand(@server_count)]) + raise + end + end + end end