lib/dalli/client.rb in dalli-3.1.0 vs lib/dalli/client.rb in dalli-3.1.1

- old
+ new

@@ -47,40 +47,55 @@ # pass an alternative implementation using another protocol. # def initialize(servers = nil, options = {}) @servers = ::Dalli::ServersArgNormalizer.normalize_servers(servers) @options = normalize_options(options) - @key_manager = ::Dalli::KeyManager.new(options) + @key_manager = ::Dalli::KeyManager.new(@options) @ring = nil end # # The standard memcached instruction set # ## - # Turn on quiet aka noreply support. - # All relevant operations within this block will be effectively - # pipelined as Dalli will use 'quiet' operations where possible. - # Currently supports the set, add, replace and delete operations. - def multi - old = Thread.current[::Dalli::MULTI_KEY] - Thread.current[::Dalli::MULTI_KEY] = true - yield - ensure - @ring&.pipeline_consume_and_ignore_responses - Thread.current[::Dalli::MULTI_KEY] = old + # Get the value associated with the key. + # If a value is not found, then +nil+ is returned. + def get(key, req_options = nil) + perform(:get, key, req_options) end ## - # Get the value associated with the key. + # Gat (get and touch) fetch an item and simultaneously update its expiration time. + # # If a value is not found, then +nil+ is returned. - def get(key, options = nil) - perform(:get, key, options) + def gat(key, ttl = nil) + perform(:gat, key, ttl_or_default(ttl)) end ## + # Touch updates expiration time for a given key. + # + # Returns true if key exists, otherwise nil. + def touch(key, ttl = nil) + resp = perform(:touch, key, ttl_or_default(ttl)) + resp.nil? ? nil : true + end + + ## + # Get the value and CAS ID associated with the key. If a block is provided, + # value and CAS will be passed to the block. + def get_cas(key) + (value, cas) = perform(:cas, key) + # TODO: This is odd. Confirm this is working as expected. + value = nil if !value || value == 'Not found' + return [value, cas] unless block_given? + + yield value, cas + end + + ## # Fetch multiple keys efficiently. # If a block is given, yields key/value pairs one at a time. # Otherwise returns a hash of { 'key' => 'value', 'key2' => 'value1' } def get_multi(*keys) keys.flatten! @@ -95,11 +110,25 @@ pipelined_getter.process(keys) { |k, data| hash[k] = data.first } end end end - CACHE_NILS = { cache_nils: true }.freeze + ## + # Fetch multiple keys efficiently, including available metadata such as CAS. + # If a block is given, yields key/data pairs one a time. Data is an array: + # [value, cas_id] + # If no block is given, returns a hash of + # { 'key' => [value, cas_id] } + def get_multi_cas(*keys) + if block_given? + pipelined_getter.process(keys) { |*args| yield(*args) } + else + {}.tap do |hash| + pipelined_getter.process(keys) { |k, data| hash[k] = data } + end + end + end # Fetch the value associated with the key. # If a value is found, then it is returned. # # If a value is not found and no block is given, then nil is returned. @@ -108,25 +137,17 @@ # and a block is given, the block will be invoked and its return value # written to the cache and returned. def fetch(key, ttl = nil, req_options = nil) req_options = req_options.nil? ? CACHE_NILS : req_options.merge(CACHE_NILS) if cache_nils val = get(key, req_options) - if not_found?(val) && block_given? - val = yield - add(key, val, ttl_or_default(ttl), req_options) - end - val - end + return val unless block_given? && not_found?(val) - def not_found?(val) - cache_nils ? val == ::Dalli::NOT_FOUND : val.nil? + new_val = yield + add(key, new_val, ttl_or_default(ttl), req_options) + new_val end - def cache_nils - @options[:cache_nils] - end - ## # compare and swap values using optimistic locking. # Fetch the existing value for key. # If it exists, yield the value to the block. # Add the block's return value as the new value for the key. @@ -134,45 +155,93 @@ # # Returns: # - nil if the key did not exist. # - false if the value was changed by someone else. # - true if the value was successfully updated. - def cas(key, ttl = nil, options = nil, &block) - cas_core(key, false, ttl, options, &block) + def cas(key, ttl = nil, req_options = nil, &block) + cas_core(key, false, ttl, req_options, &block) end ## # like #cas, but will yield to the block whether or not the value # already exists. # # Returns: # - false if the value was changed by someone else. # - true if the value was successfully updated. - def cas!(key, ttl = nil, options = nil, &block) - cas_core(key, true, ttl, options, &block) + def cas!(key, ttl = nil, req_options = nil, &block) + cas_core(key, true, ttl, req_options, &block) end - def set(key, value, ttl = nil, options = nil) - perform(:set, key, value, ttl_or_default(ttl), 0, options) + ## + # Turn on quiet aka noreply support for a number of + # memcached operations. + # + # All relevant operations within this block will be effectively + # pipelined as Dalli will use 'quiet' versions. The invoked methods + # will all return nil, rather than their usual response. Method + # latency will be substantially lower, as the caller will not be + # blocking on responses. + # + # Currently supports storage (set, add, replace, append, prepend), + # arithmetic (incr, decr), flush and delete operations. Use of + # unsupported operations inside a block will raise an error. + # + # Any error replies will be discarded at the end of the block, and + # Dalli client methods invoked inside the block will not + # have return values + def quiet + old = Thread.current[::Dalli::QUIET] + Thread.current[::Dalli::QUIET] = true + yield + ensure + @ring&.pipeline_consume_and_ignore_responses + Thread.current[::Dalli::QUIET] = old end + alias multi quiet + def set(key, value, ttl = nil, req_options = nil) + set_cas(key, value, 0, ttl, req_options) + end + ## + # Set the key-value pair, verifying existing CAS. + # Returns the resulting CAS value if succeeded, and falsy otherwise. + def set_cas(key, value, cas, ttl = nil, req_options = nil) + perform(:set, key, value, ttl_or_default(ttl), cas, req_options) + end + + ## # Conditionally add a key/value pair, if the key does not already exist # on the server. Returns truthy if the operation succeeded. - def add(key, value, ttl = nil, options = nil) - perform(:add, key, value, ttl_or_default(ttl), options) + def add(key, value, ttl = nil, req_options = nil) + perform(:add, key, value, ttl_or_default(ttl), req_options) end ## # Conditionally add a key/value pair, only if the key already exists # on the server. Returns truthy if the operation succeeded. - def replace(key, value, ttl = nil, options = nil) - perform(:replace, key, value, ttl_or_default(ttl), 0, options) + def replace(key, value, ttl = nil, req_options = nil) + replace_cas(key, value, 0, ttl, req_options) end + ## + # Conditionally add a key/value pair, verifying existing CAS, only if the + # key already exists on the server. Returns the new CAS value if the + # operation succeeded, or falsy otherwise. + def replace_cas(key, value, cas, ttl = nil, req_options = nil) + perform(:replace, key, value, ttl_or_default(ttl), cas, req_options) + end + + # Delete a key/value pair, verifying existing CAS. + # Returns true if succeeded, and falsy otherwise. + def delete_cas(key, cas = 0) + perform(:delete, key, cas) + end + def delete(key) - perform(:delete, key, 0) + delete_cas(key, 0) end ## # Append value to the value already stored on the server for 'key'. # Appending only works for values stored with :raw => true. @@ -185,17 +254,10 @@ # Prepending only works for values stored with :raw => true. def prepend(key, value) perform(:prepend, key, value.to_s) end - def flush(delay = 0) - time = -delay - ring.servers.map { |s| s.request(:flush, time += delay) } - end - - alias flush_all flush - ## # Incr adds the given amount to the counter on the memcached server. # Amt must be a positive integer value. # # If default is nil, the counter must already exist or the operation @@ -203,12 +265,14 @@ # the new value for the counter. # # Note that the ttl will only apply if the counter does not already # exist. To increase an existing counter and update its TTL, use # #cas. + # + # If the value already exists, it must have been set with raw: true def incr(key, amt = 1, ttl = nil, default = nil) - raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + check_positive!(amt) perform(:incr, key, amt.to_i, ttl_or_default(ttl), default) end ## @@ -223,39 +287,35 @@ # the new value for the counter. # # Note that the ttl will only apply if the counter does not already # exist. To decrease an existing counter and update its TTL, use # #cas. + # + # If the value already exists, it must have been set with raw: true def decr(key, amt = 1, ttl = nil, default = nil) - raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + check_positive!(amt) perform(:decr, key, amt.to_i, ttl_or_default(ttl), default) end ## - # Touch updates expiration time for a given key. - # - # Returns true if key exists, otherwise nil. - def touch(key, ttl = nil) - resp = perform(:touch, key, ttl_or_default(ttl)) - resp.nil? ? nil : true - end - + # Flush the memcached server, at 'delay' seconds in the future. + # Delay defaults to zero seconds, which means an immediate flush. ## - # Gat (get and touch) fetch an item and simultaneously update its expiration time. - # - # If a value is not found, then +nil+ is returned. - def gat(key, ttl = nil) - perform(:gat, key, ttl_or_default(ttl)) + def flush(delay = 0) + ring.servers.map { |s| s.request(:flush, delay) } end + alias flush_all flush + ALLOWED_STAT_KEYS = %i[items slabs settings].freeze + ## # Collect the stats for each server. # You can optionally pass a type including :items, :slabs or :settings to get specific stats # Returns a hash like { 'hostname:port' => { 'stat1' => 'value1', ... }, 'hostname2:port' => { ... } } def stats(type = nil) - type = nil unless [nil, :items, :slabs, :settings].include? type + type = nil unless ALLOWED_STAT_KEYS.include? type values = {} ring.servers.each do |server| values[server.name.to_s] = server.alive? ? server.request(:stats, type.to_s) : nil end values @@ -268,102 +328,68 @@ server.alive? ? server.request(:reset_stats) : nil end end ## - ## Make sure memcache servers are alive, or raise an Dalli::RingError - def alive! - ring.server_for_key('') - end - - ## ## Version of the memcache servers. def version values = {} ring.servers.each do |server| values[server.name.to_s] = server.alive? ? server.request(:version) : nil end values end ## - # Get the value and CAS ID associated with the key. If a block is provided, - # value and CAS will be passed to the block. - def get_cas(key) - (value, cas) = perform(:cas, key) - value = nil if !value || value == 'Not found' - if block_given? - yield value, cas - else - [value, cas] - end + ## Make sure memcache servers are alive, or raise an Dalli::RingError + def alive! + ring.server_for_key('') end ## - # Fetch multiple keys efficiently, including available metadata such as CAS. - # If a block is given, yields key/data pairs one a time. Data is an array: - # [value, cas_id] - # If no block is given, returns a hash of - # { 'key' => [value, cas_id] } - def get_multi_cas(*keys) - if block_given? - pipelined_getter.process(keys) { |*args| yield(*args) } - else - {}.tap do |hash| - pipelined_getter.process(keys) { |k, data| hash[k] = data } - end - end - end - - ## - # Set the key-value pair, verifying existing CAS. - # Returns the resulting CAS value if succeeded, and falsy otherwise. - def set_cas(key, value, cas, ttl = nil, options = nil) - ttl ||= @options[:expires_in].to_i - perform(:set, key, value, ttl, cas, options) - end - - ## - # Conditionally add a key/value pair, verifying existing CAS, only if the - # key already exists on the server. Returns the new CAS value if the - # operation succeeded, or falsy otherwise. - def replace_cas(key, value, cas, ttl = nil, options = nil) - ttl ||= @options[:expires_in].to_i - perform(:replace, key, value, ttl, cas, options) - end - - # Delete a key/value pair, verifying existing CAS. - # Returns true if succeeded, and falsy otherwise. - def delete_cas(key, cas = 0) - perform(:delete, key, cas) - end - - ## # Close our connection to each server. # If you perform another operation after this, the connections will be re-established. def close @ring&.close @ring = nil end alias reset close + CACHE_NILS = { cache_nils: true }.freeze + + def not_found?(val) + cache_nils ? val == ::Dalli::NOT_FOUND : val.nil? + end + + def cache_nils + @options[:cache_nils] + end + # Stub method so a bare Dalli client can pretend to be a connection pool. def with yield self end private - def cas_core(key, always_set, ttl = nil, options = nil) + def check_positive!(amt) + raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + end + + def cas_core(key, always_set, ttl = nil, req_options = nil) (value, cas) = perform(:cas, key) value = nil if !value || value == 'Not found' return if value.nil? && !always_set newvalue = yield(value) - perform(:set, key, newvalue, ttl_or_default(ttl), cas, options) + perform(:set, key, newvalue, ttl_or_default(ttl), cas, req_options) end + ## + # Uses the argument TTL or the client-wide default. Ensures + # that the value is an integer + ## def ttl_or_default(ttl) (ttl || @options[:expires_in]).to_i rescue NoMethodError raise ArgumentError, "Cannot convert ttl (#{ttl}) to an integer" end @@ -380,10 +406,19 @@ def protocol_implementation @protocol_implementation ||= @options.fetch(:protocol_implementation, Dalli::Protocol::Binary) end - # Chokepoint method for instrumentation + ## + # Chokepoint method for memcached methods with a key argument. + # Validates the key, resolves the key to the appropriate server + # instance, and invokes the memcached method on the appropriate + # server. + # + # This method also forces retries on network errors - when + # a particular memcached instance becomes unreachable, or the + # operational times out. + ## def perform(*all_args) return yield if block_given? op, key, *args = all_args