lib/kestrel/client.rb in kestrel-client-0.6.1 vs lib/kestrel/client.rb in kestrel-client-0.6.4
- old
+ new
@@ -1,9 +1,10 @@
+require 'forwardable'
+
module Kestrel
- class Client < Memcached
+ class Client
require 'kestrel/client/stats_helper'
- require 'kestrel/client/retry_helper'
autoload :Proxy, 'kestrel/client/proxy'
autoload :Envelope, 'kestrel/client/envelope'
autoload :Blocking, 'kestrel/client/blocking'
autoload :Partitioning, 'kestrel/client/partitioning'
@@ -33,84 +34,69 @@
Memcached::MemoryAllocationFailure,
Memcached::ReadFailure,
Memcached::ServerError,
Memcached::SystemError,
Memcached::UnknownReadFailure,
- Memcached::WriteFailure
+ Memcached::WriteFailure,
+ Memcached::NotFound
]
+ extend Forwardable
include StatsHelper
- include RetryHelper
+ attr_accessor :servers, :options
+ attr_reader :current_queue, :kestrel_options, :current_server
+ def_delegators :@write_client, :add, :append, :cas, :decr, :incr, :get_orig, :prepend
+
def initialize(*servers)
opts = servers.last.is_a?(Hash) ? servers.pop : {}
opts = DEFAULT_OPTIONS.merge(opts)
@kestrel_options = extract_kestrel_options!(opts)
@default_get_timeout = kestrel_options[:get_timeout_ms]
@gets_per_server = kestrel_options[:gets_per_server]
@exception_retry_limit = kestrel_options[:exception_retry_limit]
@counter = 0
+ @shuffle = true
# we handle our own retries so that we can apply different
# policies to sets and gets, so set memcached limit to 0
opts[:exception_retry_limit] = 0
opts[:distribution] = :random # force random distribution
- super Array(servers).flatten.compact, opts
- end
+ self.servers = Array(servers).flatten.compact
+ self.options = opts
-
- attr_reader :current_queue, :kestrel_options
-
-
- # Memcached overrides
-
- %w(add append cas decr incr get_orig prepend).each do |m|
- undef_method m
+ @server_count = self.servers.size # Minor optimization.
+ @read_client = Memcached.new(self.servers[rand(@server_count)], opts)
+ @write_client = Memcached.new(self.servers[rand(@server_count)], opts)
end
- alias _super_get_from_random get
- private :_super_get_from_random
-
- def get_from_random(key, raw=false)
- _super_get_from_random key, !raw
- rescue Memcached::NotFound
- end
-
- # use get_from_last if available, otherwise redefine to point to
- # plain old get
- if method_defined? :get_from_last
-
- def get_from_last(key, raw=false)
- super key, !raw
- rescue Memcached::NotFound
- end
-
- else
-
- $stderr.puts "You have an older version of memcached.gem. Please upgrade to 0.19.6 or later for sticky get behavior."
- def get_from_last(key, raw=false)
- _super_get_from_random key, !raw
- rescue Memcached::NotFound
- end
-
- end # end ifdef :)
-
def delete(key, expiry=0)
- with_retries { super key }
+ with_retries { @write_client.delete key }
rescue Memcached::NotFound, Memcached::ServerEnd
end
def set(key, value, ttl=0, raw=false)
- with_retries { super key, value, ttl, !raw }
+ with_retries { @write_client.set key, value, ttl, !raw }
true
rescue Memcached::NotStored
false
end
+ # This provides the necessary semantic to support transactionality
+ # in the Transactional client. It temporarily disables server
+ # shuffling to allow the client to close any open transactions on
+ # the current server before jumping.
+ #
+ def get_from_last(*args)
+ @shuffle = false
+ get *args
+ ensure
+ @shuffle = true
+ end
# ==== Parameters
# key<String>:: Queue name
# opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash
# allows collision-avoiding options support.
@@ -123,16 +109,17 @@
# :timeout<Integer>:: Milliseconds to block for a new item
# :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old
# style" second argument.
#
def get(key, opts = {})
- raw = opts.delete(:raw) || false
+ raw = opts[:raw] || false
commands = extract_queue_commands(opts)
val =
begin
- send(select_get_method(key), key + commands, raw)
+ shuffle_if_necessary! key
+ @read_client.get key + commands, !raw
rescue *RECOVERABLE_ERRORS
# we can't tell the difference between a server being down
# and an empty queue, so just return nil. our sticky server
# logic should eliminate piling on down servers
nil
@@ -165,18 +152,22 @@
end
opts.replace(memcache_opts)
kestrel_opts
end
- def select_get_method(key)
- if key != @current_queue || @counter >= @gets_per_server
+ def shuffle_if_necessary!(key)
+ # Don't reset servers on the first request:
+ # i.e. @counter == 0 && @current_queue == nil
+ if @shuffle &&
+ (@counter > 0 && key != @current_queue) ||
+ @counter >= @gets_per_server
@counter = 0
@current_queue = key
- :get_from_random
+ @read_client.quit
+ @read_client.set_servers(servers[rand(@server_count)])
else
@counter +=1
- :get_from_last
end
end
def extract_queue_commands(opts)
commands = [:open, :close, :abort, :peek].select do |key|
@@ -187,7 +178,22 @@
commands << "t=#{timeout}"
end
commands.map { |c| "/#{c}" }.join('')
end
+
+ def with_retries #:nodoc:
+ yield
+ rescue *RECOVERABLE_ERRORS
+ tries ||= @exception_retry_limit + 1
+ tries -= 1
+ if tries > 0
+ retry
+ else
+ @write_client.quit
+ @write_client.set_servers(servers[rand(@server_count)])
+ raise
+ end
+ end
+
end
end