lib/kestrel/client.rb in kestrel-client-0.4.1 vs lib/kestrel/client.rb in kestrel-client-0.5.0

- old
+ new

@@ -1,41 +1,101 @@ module Kestrel - class Client < Memcached::Rails + class Client < Memcached + require 'kestrel/client/stats_helper' + require 'kestrel/client/retry_helper' + autoload :Proxy, 'kestrel/client/proxy' autoload :Envelope, 'kestrel/client/envelope' autoload :Blocking, 'kestrel/client/blocking' autoload :Partitioning, 'kestrel/client/partitioning' autoload :Unmarshal, 'kestrel/client/unmarshal' autoload :Namespace, 'kestrel/client/namespace' autoload :Json, 'kestrel/client/json' autoload :Reliable, "kestrel/client/reliable" - autoload :Retrying, "kestrel/client/retrying" - QUEUE_STAT_NAMES = %w{items bytes total_items logsize expired_items mem_items mem_bytes age discarded} + KESTREL_OPTIONS = [:gets_per_server, :exception_retry_limit, :get_timeout_ms].freeze - # ==== Parameters - # key<String>:: Queue name - # opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash - # allows collision-avoiding options support. - # - # ==== Options (opts) - # :open<Boolean>:: Begins a reliable read. - # :close<Boolean>:: Ends a reliable read. - # :abort<Boolean>:: Cancels an existing reliable read - # :peek<Boolean>:: Return the head of the queue, without removal - # :timeout<Integer>:: Milliseconds to block for a new item - # :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old - # style" second argument. - # - def get(key, opts = false) - opts = extract_options(opts) - raw = opts.delete(:raw) - commands = extract_queue_commands(opts) + DEFAULT_OPTIONS = { + :retry_timeout => 0, + :exception_retry_limit => 5, + :timeout => 0.25, + :gets_per_server => 100, + :get_timeout_ms => 10 + }.freeze - super key + commands, raw + include StatsHelper + include RetryHelper + + + def initialize(*servers) + opts = servers.last.is_a?(Hash) ? servers.pop : {} + opts = DEFAULT_OPTIONS.merge(opts) + + @kestrel_options = extract_kestrel_options!(opts) + @default_get_timeout = kestrel_options[:get_timeout_ms] + @gets_per_server = kestrel_options[:gets_per_server] + @exception_retry_limit = kestrel_options[:exception_retry_limit] + @counter = 0 + + # we handle our own retries so that we can apply different + # policies to sets and gets, so set memcached limit to 0 + opts[:exception_retry_limit] = 0 + opts[:distribution] = :random # force random distribution + + super Array(servers).flatten.compact, opts end + + attr_reader :current_queue, :kestrel_options + + + # Memcached overrides + + %w(add append cas decr incr get_orig prepend).each do |m| + undef_method m + end + + alias _super_get_from_random get + private :_super_get_from_random + + def get_from_random(key, raw=false) + _super_get_from_random key, !raw + rescue Memcached::NotFound + end + + # use get_from_last if available, otherwise redefine to point to + # plain old get + if method_defined? :get_from_last + + def get_from_last(key, raw=false) + super key, !raw + rescue Memcached::NotFound + end + + else + + $stderr.puts "You have an older version of memcached.gem. Please upgrade to 0.19.6 or later for sticky get behavior." + def get_from_last(key, raw=false) + _super_get_from_random key, !raw + rescue Memcached::NotFound + end + + end # end ifdef :) + + def delete(key, expiry=0) + with_retries { super key } + rescue Memcached::NotFound, Memcached::ServerEnd + end + + def set(key, value, ttl=0, raw=false) + with_retries { super key, value, ttl, !raw } + true + rescue Memcached::NotStored + false + end + + # ==== Parameters # key<String>:: Queue name # opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash # allows collision-avoiding options support. # @@ -46,118 +106,72 @@ # :peek<Boolean>:: Return the head of the queue, without removal # :timeout<Integer>:: Milliseconds to block for a new item # :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old # style" second argument. # - def get_from_last(key, opts = {}) - opts = extract_options(opts) - raw = opts.delete(:raw) + def get(key, opts = {}) + raw = opts.delete(:raw) || false commands = extract_queue_commands(opts) - super key + commands, raw + val = + begin + send(select_get_method(key), key + commands, raw) + rescue Memcached::ATimeoutOccurred, Memcached::ServerIsMarkedDead + # we can't tell the difference between a server being down + # and an empty queue, so just return nil. our sticky server + # logic should eliminate piling on down servers + nil + end + + # nil result, force next get to jump from current server + @counter = @gets_per_server unless val + + val end def flush(queue) count = 0 while sizeof(queue) > 0 - while get queue, :raw => true - count += 1 - end + count += 1 while get queue, :raw => true end count end def peek(queue) get queue, :peek => true end - def sizeof(queue) - stat_info = stat(queue) - stat_info ? stat_info['items'] : 0 - end + private - def available_queues - stats['queues'].keys.sort + def extract_kestrel_options!(opts) + kestrel_opts, memcache_opts = opts.inject([{}, {}]) do |(kestrel, memcache), (key, opt)| + (KESTREL_OPTIONS.include?(key) ? kestrel : memcache)[key] = opt + [kestrel, memcache] + end + opts.replace(memcache_opts) + kestrel_opts end - def stats - merge_stats(servers.map { |server| stats_for_server(server) }) + def select_get_method(key) + if key != @current_queue || @counter >= @gets_per_server + @counter = 0 + @current_queue = key + :get_from_random + else + @counter +=1 + :get_from_last + end end - def stat(queue) - stats['queues'][queue] - end - - private - - def extract_options(opts) - opts.is_a?(Hash) ? opts : { :raw => !!opts } - end - def extract_queue_commands(opts) commands = [:open, :close, :abort, :peek].select do |key| opts[key] end - commands << "t=#{opts[:timeout]}" if opts[:timeout] - - commands.map { |c| "/#{c}" }.join('') - end - - def stats_for_server(server) - server_name, port = server.split(/:/) - socket = TCPSocket.new(server_name, port) - socket.puts "STATS" - - stats = Hash.new - stats['queues'] = Hash.new - while line = socket.readline - if line =~ /^STAT queue_(\S+?)_(#{QUEUE_STAT_NAMES.join("|")}) (\S+)/ - queue_name, queue_stat_name, queue_stat_value = $1, $2, deserialize_stat_value($3) - stats['queues'][queue_name] ||= Hash.new - stats['queues'][queue_name][queue_stat_name] = queue_stat_value - elsif line =~ /^STAT (\w+) (\S+)/ - stat_name, stat_value = $1, deserialize_stat_value($2) - stats[stat_name] = stat_value - elsif line =~ /^END/ - socket.close - break - elsif defined?(RAILS_DEFAULT_LOGGER) - RAILS_DEFAULT_LOGGER.debug("KestrelClient#stats_for_server: Ignoring #{line}") - end + if timeout = (opts[:timeout] || @default_get_timeout) + commands << "t=#{timeout}" end - stats - end - - def merge_stats(all_stats) - result = Hash.new - - all_stats.each do |stats| - stats.each do |stat_name, stat_value| - if result.has_key?(stat_name) - if stat_value.kind_of?(Hash) - result[stat_name] = merge_stats([result[stat_name], stat_value]) - else - result[stat_name] += stat_value - end - else - result[stat_name] = stat_value - end - end - end - - result - end - - def deserialize_stat_value(value) - case value - when /^\d+\.\d+$/: - value.to_f - when /^\d+$/: - value.to_i - else - value - end + commands.map { |c| "/#{c}" }.join('') end end end