# frozen_string_literal: true, encoding: ASCII-8BIT require 'forwardable' require 'thread' module Libcouchbase class Bucket extend Forwardable # Finalizer done right # http://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/ def self.finalize(connection) proc { connection.reactor.unref connection.destroy } end def initialize(**options) @connection_options = options @connection = Connection.new(**options) connect # This obtains the connections reactor @reactor = reactor @quiet = true # clean up the connection once this object is garbage collected ObjectSpace.define_finalizer( self, self.class.finalize(@connection) ) end attr_reader :connection attr_accessor :quiet def_delegators :@connection, :bucket, :reactor # Obtain an object stored in Couchbase by given key. # # @param keys [String, Symbol, Array] One or several keys to fetch # @param options [Hash] Options for operation. # @option options [Integer] :lock time to lock this key for. Max time 30s # @option options [true, false] :extended (false) If set to +true+, the # operation will return a +Libcouchbase::Result+, otherwise (by default) # it returns just the value. # @option options [true, false] :quiet (self.quiet) If set to +true+, the # operation won't raise error for missing key, it will return +nil+. # Otherwise it will raise a not found error. # @option options [Symbol] :format the value should be stored as. # Defaults to :document # @option options [true, false] :assemble_hash (false) Assemble Hash for # results. # # @return [Object, Array, Hash, Libcouchbase::Result] the value(s) # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # with a different CAS value to that provided # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # @raise [Libcouchbase::Error::KeyNotFound] if the key doesn't exists # # @example Get single value in quiet mode (the default) # c.get("foo") #=> the associated value or nil # # @example Use alternative hash-like syntax # c["foo"] #=> the associated value or nil # # @example Get single value in verbose mode # c.get("missing-foo", quiet: false) #=> raises Libcouchbase::Error::NotFound # # @example Get multiple keys # c.get("foo", "bar", "baz") #=> [val1, val2, val3] # # @example Get multiple keys with assembing result into the Hash # c.get("foo", "bar", "baz", assemble_hash: true) # #=> {"foo" => val1, "bar" => val2, "baz" => val3} # # @example Get and lock key using default timeout # c.get("foo", lock: true) # This locks for the maximum time of 30 seconds # # @example Get and lock key using custom timeout # c.get("foo", lock: 3) # # @example Get and lock multiple keys using custom timeout # c.get("foo", "bar", lock: 3) def get(key, *keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts) was_array = key.respond_to?(:to_a) || keys.length > 0 keys.unshift Array(key) # Convert enumerables keys.flatten! # Ensure we're left with a list of keys if keys.length == 1 promise = @connection.get(keys[0], **opts) unless extended promise = promise.then(proc { |resp| resp.value }) end if quiet promise = promise.catch { |err| if err.is_a? Libcouchbase::Error::KeyNotFound nil else ::Libuv::Q.reject(@reactor, err) end } end if assemble_hash promise = promise.then(proc { |val| hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {} hash[keys[0]] = val hash }) elsif was_array promise = promise.then(proc { |val| Array(val) }) end result(promise, async) else promises = keys.collect { |key| @connection.get(key, **opts) } if quiet promises.map! { |prom| prom.catch { |err| if err.is_a? Libcouchbase::Error::KeyNotFound nil else ::Libuv::Q.reject(@reactor, err) end } } end result(@reactor.all(*promises).then(proc { |results| if not extended results.collect! { |resp| resp.value if resp } else results.compact! end if assemble_hash hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {} keys.each_with_index do |key, index| hash[key] = results[index] end hash else results end }), async) end end alias_method :[], :get # Add the item to the database, but fail if the object exists already # # @param key [String, Symbol] Key used to reference the value. # @param value [Object] Value to be stored # @param options [Hash] Options for operation. # @option options [Integer] :ttl Expiry time for key in seconds # @option options [Integer] :expire_in Expiry time for key in seconds # @option options [Integer, Time] :expire_at Unix epoc or time at which a key # should expire # @option options [Symbol] :format the value should be stored as. # @option options [Integer] :cas The CAS value for an object. This value is # created on the server and is guaranteed to be unique for each value of # a given key. This value is used to provide simple optimistic # concurrency control when multiple clients or threads try to update an # item simultaneously. # @option options [Integer] :persist_to persist to a number of nodes before returing # a result. Use -1 to persist to the maximum number of nodes # @option options [Integer] :replicate_to replicate to a number of nodes before # returning a result. Use -1 to replicate to the maximum number of nodes # # @return [Libcouchbase::Result] this includes the CAS value of the object. # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # # @example Store the key which will be expired in 2 seconds using relative TTL. # c.add("foo", "bar", expire_in: 2) # # @example Store the key which will be expired in 2 seconds using absolute TTL. # c.add(:foo, :bar, expire_at: Time.now.to_i + 2) # # @example Force JSON document format for value # c.add("foo", {"bar" => "baz"}, format: :document) # # @example Set application specific flags (note that it will be OR-ed with format flags) # c.add("foo", "bar", flags: 0x1000) # # @example Ensure that the key will be persisted at least on the one node # c.add("foo", "bar", persist_to: 1) def add(key, value, async: false, **opts) result @connection.store(key, value, **AddDefaults.merge(opts)), async end AddDefaults = {operation: :add}.freeze # Unconditionally store the object in the Couchbase # # @param key [String, Symbol] Key used to reference the value. # @param value [Object] Value to be stored # @param options [Hash] Options for operation. # @option options [Integer] :ttl Expiry time for key in seconds # @option options [Integer] :expire_in Expiry time for key in seconds # @option options [Integer, Time] :expire_at Unix epoc or time at which a key # should expire # @option options [Symbol] :format the value should be stored as. # @option options [Integer] :cas The CAS value for an object. This value is # created on the server and is guaranteed to be unique for each value of # a given key. This value is used to provide simple optimistic # concurrency control when multiple clients or threads try to update an # item simultaneously. # @option options [Integer] :persist_to persist to a number of nodes before returing # a result. Use -1 to persist to the maximum number of nodes # @option options [Integer] :replicate_to replicate to a number of nodes before # returning a result. Use -1 to replicate to the maximum number of nodes # # @return [Libcouchbase::Result] this includes the CAS value of the object. # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # with a different CAS value to that provided # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # # @example Store the key which will be expired in 2 seconds using relative TTL. # c.set("foo", "bar", expire_in: 2) # # @example Store the key which will be expired in 2 seconds using absolute TTL. # c.set(:foo, :bar, expire_at: Time.now.to_i + 2) # # @example Force JSON document format for value # c.set("foo", {"bar" => "baz"}, format: :document) # # @example Use hash-like syntax to store the value # c[:foo] = {bar: :baz} # # @example Set application specific flags (note that it will be OR-ed with format flags) # c.set("foo", "bar", flags: 0x1000) # # @example Perform optimistic locking by specifying last known CAS version # c.set("foo", "bar", cas: 8835713818674332672) # # @example Ensure that the key will be persisted at least on the one node # c.set("foo", "bar", persist_to: 1) def set(key, value, async: false, **opts) # default operation is set result @connection.store(key, value, **opts), async end alias_method :[]=, :set # Replace the existing object in the database # # @param key [String, Symbol] Key used to reference the value. # @param value [Object] Value to be stored # @param options [Hash] Options for operation. # @option options [Integer] :ttl Expiry time for key in seconds # @option options [Integer] :expire_in Expiry time for key in seconds # @option options [Integer, Time] :expire_at Unix epoc or time at which a key # should expire # @option options [Symbol] :format the value should be stored as. # @option options [Integer] :cas The CAS value for an object. This value is # created on the server and is guaranteed to be unique for each value of # a given key. This value is used to provide simple optimistic # concurrency control when multiple clients or threads try to update an # item simultaneously. # @option options [Integer] :persist_to persist to a number of nodes before returing # a result. Use -1 to persist to the maximum number of nodes # @option options [Integer] :replicate_to replicate to a number of nodes before # returning a result. Use -1 to replicate to the maximum number of nodes # # @return [Libcouchbase::Result] this includes the CAS value of the object. # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # with a different CAS value to that provided # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # @raise [Libcouchbase::Error::KeyNotFound] if the key doesn't exists # # @example Store the key which will be expired in 2 seconds using relative TTL. # c.replace("foo", "bar", expire_in: 2) # # @example Store the key which will be expired in 2 seconds using absolute TTL. # c.replace(:foo, :bar, expire_at: Time.now.to_i + 2) # # @example Force JSON document format for value # c.replace("foo", {"bar" => "baz"}, format: :document) # # @example Set application specific flags (note that it will be OR-ed with format flags) # c.replace("foo", "bar", flags: 0x1000) # # @example Ensure that the key will be persisted at least on the one node # c.replace("foo", "bar", persist_to: 1) def replace(key, value, async: false, **opts) result @connection.store(key, value, **ReplaceDefaults.merge(opts)), async end ReplaceDefaults = {operation: :replace}.freeze # Append this object to the existing object # # @note This operation is kind of data-aware from server point of view. # This mean that the server treats value as binary stream and just # perform concatenation, therefore it won't work with +:marshal+ and # +:document+ formats, because of lack of knowledge how to merge values # in these formats. # # @param key [String, Symbol] Key used to reference the value. # @param value [Object] Value to be appended # @param options [Hash] Options for operation. # @option options [Integer] :cas The CAS value for an object. This value is # created on the server and is guaranteed to be unique for each value of # a given key. This value is used to provide simple optimistic # concurrency control when multiple clients or threads try to update an # item simultaneously. # @option options [Integer] :persist_to persist to a number of nodes before returing # a result. Use -1 to persist to the maximum number of nodes # @option options [Integer] :replicate_to replicate to a number of nodes before # returning a result. Use -1 to replicate to the maximum number of nodes # # @return [Libcouchbase::Result] this includes the CAS value of the object. # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # with a different CAS value to that provided # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # @raise [Libcouchbase::Error::KeyNotFound] if the key doesn't exists # # @example Simple append # c.set(:foo, "aaa", format: :plain) # c.append(:foo, "bbb") # c.get("foo") #=> "aaabbb" # # @example Using optimistic locking. The operation will fail on CAS mismatch # resp = c.set("foo", "aaa", format: :plain) # c.append("foo", "bbb", cas: resp.cas) # # @example Ensure that the key will be persisted at least on the one node # c.append("foo", "bar", persist_to: 1) def append(key, value, async: false, **opts) result @connection.store(key, value, **AppendDefaults.merge(opts)), async end AppendDefaults = {operation: :append}.freeze # Prepend this object to the existing object # # @note This operation is kind of data-aware from server point of view. # This mean that the server treats value as binary stream and just # perform concatenation, therefore it won't work with +:marshal+ and # +:document+ formats, because of lack of knowledge how to merge values # in these formats. # # @param key [String, Symbol] Key used to reference the value. # @param value [Object] Value to be appended # @param options [Hash] Options for operation. # @option options [Integer] :cas The CAS value for an object. This value is # created on the server and is guaranteed to be unique for each value of # a given key. This value is used to provide simple optimistic # concurrency control when multiple clients or threads try to update an # item simultaneously. # @option options [Integer] :persist_to persist to a number of nodes before returing # a result. Use -1 to persist to the maximum number of nodes # @option options [Integer] :replicate_to replicate to a number of nodes before # returning a result. Use -1 to replicate to the maximum number of nodes # # @return [Libcouchbase::Result] this includes the CAS value of the object. # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # with a different CAS value to that provided # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # @raise [Libcouchbase::Error::KeyNotFound] if the key doesn't exists # # @example Simple prepend # c.set(:foo, "aaa", format: :plain) # c.prepend(:foo, "bbb") # c.get("foo") #=> "bbbaaa" # # @example Using optimistic locking. The operation will fail on CAS mismatch # resp = c.set("foo", "aaa", format: :plain) # c.prepend("foo", "bbb", cas: resp.cas) # # @example Ensure that the key will be persisted at least on the one node # c.prepend("foo", "bar", persist_to: 1) def prepend(key, value, async: false, **opts) result @connection.store(key, value, **PrependDefaults.merge(opts)), async end PrependDefaults = {operation: :prepend}.freeze # Increment the value of an existing numeric key # # The increment method allow you to increase or decrease a given stored # integer value. Updating the value of a key if it can be parsed to an integer. # The update operation occurs on the server and is provided at the protocol # level. This simplifies what would otherwise be a two-stage get and set # operation. # # @param key [String, Symbol] Key used to reference the value. # @param by [Integer] Integer (up to 64 bits) value to increment or decrement # @param options [Hash] Options for operation. # @option options [true, false] :create (false) If set to +true+, it will # initialize the key with zero value and zero flags (use +:initial+ # option to set another initial value). Note: it won't increment the # missing value. # @option options [Integer] :initial (0) Integer (up to 64 bits) value for # missing key initialization. This option imply +:create+ option is +true+ # @option options [Integer] :ttl Expiry time for key in seconds # @option options [Integer] :expire_in Expiry time for key in seconds # @option options [Integer, Time] :expire_at Unix epoc or time at which a key # should expire # @option options [true, false] :extended (false) If set to +true+, the # operation will return a +Libcouchbase::Result+, otherwise (by default) # it returns just the value. # # @return [Integer] the actual value of the key. # # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # @raise [Libcouchbase::Error::KeyNotFound] if the key doesn't exists # @raise [Libcouchbase::Error::DeltaBadval] if the key contains non-numeric value # # @example Increment key by one # c.incr(:foo) # # @example Increment key by 50 # c.incr("foo", 50) # # @example Increment key by one OR initialize with zero # c.incr("foo", create: true) #=> will return old+1 or 0 # # @example Increment key by one OR initialize with three # c.incr("foo", 50, initial: 3) #=> will return old+50 or 3 # # @example Increment key and get its CAS value # resp = c.incr("foo", :extended => true) # resp.cas #=> 12345 # resp.value #=> 2 def incr(key, by = 1, create: false, extended: false, async: false, **opts) opts[:delta] ||= by opts[:initial] = 0 if create promise = @connection.counter(key, **opts) if not extended promise = promise.then { |resp| resp.value } end result promise, async end # Decrement the value of an existing numeric key # # Helper method, see incr def decr(key, by = 1, **opts) incr(key, -by, **opts) end # Delete the specified key # # @param key [String, Symbol] Key used to reference the value. # @param options [Hash] Options for operation. # @option options [Integer] :cas The CAS value for an object. This value is # created on the server and is guaranteed to be unique for each value of # a given key. This value is used to provide simple optimistic # concurrency control when multiple clients or threads try to modify an # item simultaneously. # @option options [true, false] :quiet (self.quiet) If set to +true+, the # operation won't raise error for missing key, it will return +nil+. # Otherwise it will raise error. # # @return [true, false] the result of the operation. # # @raise [Libcouchbase::Error::KeyExists] if the key already exists on the server # with a different CAS value to that provided # @raise [Libouchbase::Error::Timedout] if timeout interval for observe exceeds # @raise [Libouchbase::Error::NetworkError] if there was a communication issue # @raise [Libcouchbase::Error::KeyNotFound] if the key doesn't exists # # @example Delete the key in quiet mode (default) # c.set("foo", "bar") # c.delete("foo") #=> true # c.delete("foo") #=> false # # @example Delete the key verbosely # c.set("foo", "bar") # c.delete("foo", quiet: false) #=> true # c.delete("foo", quiet: true) #=> nil (default behaviour) # c.delete("foo", quiet: false) #=> will raise Libcouchbase::Error::KeyNotFound # # @example Delete the key with version check # res = c.set("foo", "bar") #=> #:document, :flags=>0}> # c.delete("foo", cas: 123456) #=> will raise Libcouchbase::Error::KeyExists # c.delete("foo", cas: res.cas) #=> true def delete(key, async: false, quiet: @quiet, **opts) promise = @connection.remove(key, **opts).then { true } if quiet promise = promise.catch { |error| if error.is_a? Libcouchbase::Error::KeyNotFound false else ::Libuv::Q.reject(@reactor, error) end } end result promise, async end # Delete contents of the bucket # # @see http://docs.couchbase.com/admin/admin/REST/rest-bucket-flush.html # # @raise [Libcouchbase::Error::HttpError] in case of an error is # encountered. # # @return [Libcouchbase::Response] # # @example Simple flush the bucket # c.flush def flush(async: false) result @connection.flush, async end # Touch a key, changing its CAS and optionally setting a timeout def touch(async: false, **opts) result @connection.touch(**opts), async end # Fetch design docs stored in current bucket # # @return [Libcouchbase::DesignDocs] def design_docs(**opts) DesignDocs.new(self, @connection, method(:result), **opts) end # Returns an enumerable for the results in a view. # # Results are lazily loaded when an operation is performed on the enum # # @return [Libcouchbase::Results] def view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier) view = @connection.query_view(design, view, **ViewDefaults.merge(opts)) view.include_docs = include_docs view.is_spatial = is_spatial current = ::Libuv::Reactor.current if current && current.running? ResultsLibuv.new(view, current, &row_modifier) elsif Object.const_defined?(:EventMachine) && EM.reactor_thread? ResultsEM.new(view, &row_modifier) else ResultsNative.new(view, &row_modifier) end end ViewDefaults = { on_error: :stop, stale: false } # Returns an enumerable for the results in a full text search. # # Results are lazily loaded when an operation is performed on the enum # # @return [Libcouchbase::Results] def full_text_search(index, query, **opts, &row_modifier) if query.is_a? Hash opts[:query] = query else opts[:query] = {query: query} end fts = @connection.full_text_search(index, **FtsDefaults.merge(opts)) current = ::Libuv::Reactor.current if current && current.running? ResultsLibuv.new(fts, current, &row_modifier) elsif Object.const_defined?(:EventMachine) && EM.reactor_thread? ResultsEM.new(fts, &row_modifier) else ResultsNative.new(fts, &row_modifier) end end FtsDefaults = { include_docs: true, size: 10000, # Max result size from: 0, explain: false } # Returns an n1ql query builder. # # @return [Libcouchbase::N1QL] def n1ql(**options) N1QL.new(self, **options) end # Update or create design doc with supplied views # # @see http://docs.couchbase.com/admin/admin/REST/rest-ddocs-create.html # # @param [Hash, IO, String] data The source object containing JSON # encoded design document. def save_design_doc(data, id = nil, async: false) attrs = case data when String JSON.parse(data, Connection::DECODE_OPTIONS) when IO JSON.parse(data.read, Connection::DECODE_OPTIONS) when Hash data else raise ArgumentError, "Document should be Hash, String or IO instance" end attrs[:language] ||= :javascript id ||= attrs.delete(:_id) id = id.to_s.sub(/^_design\//, '') result @connection.http("/_design/#{id}", method: :put, body: attrs, type: :view ), async end # Delete design doc with given id and optional revision. # # @see http://docs.couchbase.com/admin/admin/REST/rest-ddocs-delete.html # # @param [String, Symbol] id ID of the design doc # @param [String] rev Optional revision def delete_design_doc(id, rev = nil, async: false) id = id.to_s.sub(/^_design\//, '') rev = "?rev=#{rev}" if rev result @connection.http("/_design/#{id}#{rev}", method: :delete, type: :view), async end # Compare and swap value. # # Reads a key's value from the server and yields it to a block. Replaces # the key's value with the result of the block as long as the key hasn't # been updated in the meantime, otherwise raises # {Libcouchbase::Error::KeyExists}. # # Setting the +:retry+ option to a positive number will cause this method # to rescue the {Libcouchbase::Error::KeyExists} error that happens when # an update collision is detected, and automatically get a fresh copy # of the value and retry the block. This will repeat as long as there # continues to be conflicts, up to the maximum number of retries specified. # # @param [String, Symbol] key # # @param [Hash] options the options for "swap" part # @option options [Fixnum] :retry (0) maximum number of times to autmatically retry upon update collision # # @yieldparam [Object] value existing value # @yieldreturn [Object] new value. # # @raise [Couchbase::Error::KeyExists] if the key was updated before the the # code in block has been completed (the CAS value has been changed). # @raise [ArgumentError] if the block is missing # # @example Implement append to JSON encoded value # # c.set(:foo, {bar: 1}) # c.cas(:foo) do |val| # val[:baz] = 2 # val # end # c.get(:foo) #=> {bar: 1, baz: 2} # # @return [Libcouchbase::Response] the transaction details including the new CAS def compare_and_swap(key, **opts) retries = opts.delete(:retry) || 0 begin current = result(@connection.get(key)) new_value = yield current.value, opts opts[:cas] = current.cas set(key, new_value, **opts) rescue Libcouchbase::Error::KeyExists retries -= 1 retry if retries >= 0 raise end end alias_method :cas, :compare_and_swap # The numbers of the replicas for each node in the cluster # @return [Integer] def get_num_replicas result @connection.get_num_replicas end # The numbers of nodes in the cluster # @return [Integer] def get_num_nodes result @connection.get_num_nodes end # Waits for all the async operations to complete and returns the results # # @return [Array] def wait_results(*results) result ::Libuv::Q.all(@reactor, *results.flatten) end protected def result(promise, async = false) return promise if async current = ::Libuv::Reactor.current if current && current.running? co promise elsif Object.const_defined?(:EventMachine) && EM.reactor_thread? # Assume this is being run in em-synchrony f = Fiber.current error = nil response = nil @connection.reactor.next_tick do begin response = co(promise) rescue => e error = e end EM.next_tick { f.resume } end Fiber.yield raise error if error response else request = Mutex.new result = ConditionVariable.new error = nil response = nil request.synchronize { @connection.reactor.next_tick do begin response = co(promise) rescue => e error = e end # Odds are we won't actually block here request.synchronize { result.signal } end result.wait(request) } raise error if error response end end def connect if @connection.reactor.running? # We don't need to start a reactor so we use the regular helper result(@connection.connect) elsif Object.const_defined?(:EventMachine) && EM.reactor_thread? start_reactor_and_em_connect else start_reactor_and_connect end end # This blocks on the current thread def start_reactor_and_connect connecting = Mutex.new result = ConditionVariable.new error = nil connecting.synchronize { Thread.new do @connection.reactor.run do |reactor| reactor.ref attempt = 0 begin co @connection.connect rescue Libcouchbase::Error::ConnectError => e attempt += 1 if attempt < 3 reactor.sleep 100 # Requires a new connection object or the retry will always fail @connection = Connection.new(**@connection_options) retry end error = e rescue => e error = e end # Odds are we won't actually block here connecting.synchronize { result.signal } end end result.wait(connecting) } raise error if error end # Assume this is being run in em-synchrony def start_reactor_and_em_connect f = Fiber.current error = nil Thread.new do @connection.reactor.run do |reactor| reactor.ref attempt = 0 begin co @connection.connect rescue Libcouchbase::Error::ConnectError => e attempt += 1 if attempt < 3 reactor.sleep 100 # Requires a new connection object or the retry will always fail @connection = Connection.new(**@connection_options) retry end error = e rescue => e error = e end EM.next_tick { f.resume } end end Fiber.yield raise error if error end end end