lib/kestrel/client.rb in kestrel-client-0.4.1 vs lib/kestrel/client.rb in kestrel-client-0.5.0
- old
+ new
@@ -1,41 +1,101 @@
module Kestrel
- class Client < Memcached::Rails
+ class Client < Memcached
+ require 'kestrel/client/stats_helper'
+ require 'kestrel/client/retry_helper'
+
autoload :Proxy, 'kestrel/client/proxy'
autoload :Envelope, 'kestrel/client/envelope'
autoload :Blocking, 'kestrel/client/blocking'
autoload :Partitioning, 'kestrel/client/partitioning'
autoload :Unmarshal, 'kestrel/client/unmarshal'
autoload :Namespace, 'kestrel/client/namespace'
autoload :Json, 'kestrel/client/json'
autoload :Reliable, "kestrel/client/reliable"
- autoload :Retrying, "kestrel/client/retrying"
- QUEUE_STAT_NAMES = %w{items bytes total_items logsize expired_items mem_items mem_bytes age discarded}
+ KESTREL_OPTIONS = [:gets_per_server, :exception_retry_limit, :get_timeout_ms].freeze
- # ==== Parameters
- # key<String>:: Queue name
- # opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash
- # allows collision-avoiding options support.
- #
- # ==== Options (opts)
- # :open<Boolean>:: Begins a reliable read.
- # :close<Boolean>:: Ends a reliable read.
- # :abort<Boolean>:: Cancels an existing reliable read
- # :peek<Boolean>:: Return the head of the queue, without removal
- # :timeout<Integer>:: Milliseconds to block for a new item
- # :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old
- # style" second argument.
- #
- def get(key, opts = false)
- opts = extract_options(opts)
- raw = opts.delete(:raw)
- commands = extract_queue_commands(opts)
+ DEFAULT_OPTIONS = {
+ :retry_timeout => 0,
+ :exception_retry_limit => 5,
+ :timeout => 0.25,
+ :gets_per_server => 100,
+ :get_timeout_ms => 10
+ }.freeze
- super key + commands, raw
+ include StatsHelper
+ include RetryHelper
+
+
+ def initialize(*servers)
+ opts = servers.last.is_a?(Hash) ? servers.pop : {}
+ opts = DEFAULT_OPTIONS.merge(opts)
+
+ @kestrel_options = extract_kestrel_options!(opts)
+ @default_get_timeout = kestrel_options[:get_timeout_ms]
+ @gets_per_server = kestrel_options[:gets_per_server]
+ @exception_retry_limit = kestrel_options[:exception_retry_limit]
+ @counter = 0
+
+ # we handle our own retries so that we can apply different
+ # policies to sets and gets, so set memcached limit to 0
+ opts[:exception_retry_limit] = 0
+ opts[:distribution] = :random # force random distribution
+
+ super Array(servers).flatten.compact, opts
end
+
+ attr_reader :current_queue, :kestrel_options
+
+
+ # Memcached overrides
+
+ %w(add append cas decr incr get_orig prepend).each do |m|
+ undef_method m
+ end
+
+ alias _super_get_from_random get
+ private :_super_get_from_random
+
+ def get_from_random(key, raw=false)
+ _super_get_from_random key, !raw
+ rescue Memcached::NotFound
+ end
+
+ # use get_from_last if available, otherwise redefine to point to
+ # plain old get
+ if method_defined? :get_from_last
+
+ def get_from_last(key, raw=false)
+ super key, !raw
+ rescue Memcached::NotFound
+ end
+
+ else
+
+ $stderr.puts "You have an older version of memcached.gem. Please upgrade to 0.19.6 or later for sticky get behavior."
+ def get_from_last(key, raw=false)
+ _super_get_from_random key, !raw
+ rescue Memcached::NotFound
+ end
+
+ end # end ifdef :)
+
+ def delete(key, expiry=0)
+ with_retries { super key }
+ rescue Memcached::NotFound, Memcached::ServerEnd
+ end
+
+ def set(key, value, ttl=0, raw=false)
+ with_retries { super key, value, ttl, !raw }
+ true
+ rescue Memcached::NotStored
+ false
+ end
+
+
# ==== Parameters
# key<String>:: Queue name
# opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash
# allows collision-avoiding options support.
#
@@ -46,118 +106,72 @@
# :peek<Boolean>:: Return the head of the queue, without removal
# :timeout<Integer>:: Milliseconds to block for a new item
# :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old
# style" second argument.
#
- def get_from_last(key, opts = {})
- opts = extract_options(opts)
- raw = opts.delete(:raw)
+ def get(key, opts = {})
+ raw = opts.delete(:raw) || false
commands = extract_queue_commands(opts)
- super key + commands, raw
+ val =
+ begin
+ send(select_get_method(key), key + commands, raw)
+ rescue Memcached::ATimeoutOccurred, Memcached::ServerIsMarkedDead
+ # we can't tell the difference between a server being down
+ # and an empty queue, so just return nil. our sticky server
+ # logic should eliminate piling on down servers
+ nil
+ end
+
+ # nil result, force next get to jump from current server
+ @counter = @gets_per_server unless val
+
+ val
end
def flush(queue)
count = 0
while sizeof(queue) > 0
- while get queue, :raw => true
- count += 1
- end
+ count += 1 while get queue, :raw => true
end
count
end
def peek(queue)
get queue, :peek => true
end
- def sizeof(queue)
- stat_info = stat(queue)
- stat_info ? stat_info['items'] : 0
- end
+ private
- def available_queues
- stats['queues'].keys.sort
+ def extract_kestrel_options!(opts)
+ kestrel_opts, memcache_opts = opts.inject([{}, {}]) do |(kestrel, memcache), (key, opt)|
+ (KESTREL_OPTIONS.include?(key) ? kestrel : memcache)[key] = opt
+ [kestrel, memcache]
+ end
+ opts.replace(memcache_opts)
+ kestrel_opts
end
- def stats
- merge_stats(servers.map { |server| stats_for_server(server) })
+ def select_get_method(key)
+ if key != @current_queue || @counter >= @gets_per_server
+ @counter = 0
+ @current_queue = key
+ :get_from_random
+ else
+ @counter +=1
+ :get_from_last
+ end
end
- def stat(queue)
- stats['queues'][queue]
- end
-
- private
-
- def extract_options(opts)
- opts.is_a?(Hash) ? opts : { :raw => !!opts }
- end
-
def extract_queue_commands(opts)
commands = [:open, :close, :abort, :peek].select do |key|
opts[key]
end
- commands << "t=#{opts[:timeout]}" if opts[:timeout]
-
- commands.map { |c| "/#{c}" }.join('')
- end
-
- def stats_for_server(server)
- server_name, port = server.split(/:/)
- socket = TCPSocket.new(server_name, port)
- socket.puts "STATS"
-
- stats = Hash.new
- stats['queues'] = Hash.new
- while line = socket.readline
- if line =~ /^STAT queue_(\S+?)_(#{QUEUE_STAT_NAMES.join("|")}) (\S+)/
- queue_name, queue_stat_name, queue_stat_value = $1, $2, deserialize_stat_value($3)
- stats['queues'][queue_name] ||= Hash.new
- stats['queues'][queue_name][queue_stat_name] = queue_stat_value
- elsif line =~ /^STAT (\w+) (\S+)/
- stat_name, stat_value = $1, deserialize_stat_value($2)
- stats[stat_name] = stat_value
- elsif line =~ /^END/
- socket.close
- break
- elsif defined?(RAILS_DEFAULT_LOGGER)
- RAILS_DEFAULT_LOGGER.debug("KestrelClient#stats_for_server: Ignoring #{line}")
- end
+ if timeout = (opts[:timeout] || @default_get_timeout)
+ commands << "t=#{timeout}"
end
- stats
- end
-
- def merge_stats(all_stats)
- result = Hash.new
-
- all_stats.each do |stats|
- stats.each do |stat_name, stat_value|
- if result.has_key?(stat_name)
- if stat_value.kind_of?(Hash)
- result[stat_name] = merge_stats([result[stat_name], stat_value])
- else
- result[stat_name] += stat_value
- end
- else
- result[stat_name] = stat_value
- end
- end
- end
-
- result
- end
-
- def deserialize_stat_value(value)
- case value
- when /^\d+\.\d+$/:
- value.to_f
- when /^\d+$/:
- value.to_i
- else
- value
- end
+ commands.map { |c| "/#{c}" }.join('')
end
end
end