require 'pry' module Couchbase class Bucket import java.io.IOException import java.net.SocketAddress import java.net.URI import java.net.URISyntaxException import java.util.ArrayList import java.util.LinkedList import java.util.List import java.util.concurrent.Future import java.util.concurrent.TimeUnit import com.couchbase.client.CouchbaseClient import com.couchbase.client.CouchbaseConnectionFactory import com.couchbase.client.protocol.views.Query include Couchbase::Operations include Couchbase::Async attr_accessor :quiet, :hostname, :port, :pool, :bucket, :username, :password, :default_ttl, :timeout, :default_arithmetic_init, :transcoder attr_reader :client, :key_prefix, :default_format # Initialize new Bucket. # # @since 1.0.0 # # @overload initialize(url, options = {}) # Initialize bucket using URI of the cluster and options. It is possible # to override some parts of URI using the options keys (e.g. :host or # :port) # # @param [String] url The full URL of management API of the cluster. # @param [Hash] options The options for connection. See options definition # below. # # @overload initialize(options = {}) # Initialize bucket using options only. # # @param [Hash] options The options for operation for connection # @option options [Array] :node_list (nil) the list of nodes to connect # to. If specified it takes precedence over +:host+ option. The list # must be array of strings in form of host names or host names with # ports (in first case port 8091 will be used, see examples). # @option options [String] :host ("localhost") the hostname or IP address # of the node # @option options [Fixnum] :port (8091) the port of the managemenent API # @option options [String] :pool ("default") the pool name # @option options [String] :bucket ("default") the bucket name # @option options [Fixnum] :default_ttl (0) the TTL used by default during # storing key-value pairs. # @option options [Fixnum] :default_flags (0) the default flags. # @option options [Symbol] :default_format (:document) the format, which # will be used for values by default. Note that changing format will # amend flags. (see {Bucket#default_format}) # @option options [String] :username (nil) the user name to connect to the # cluster. Used to authenticate on management API. The username could # be skipped for protected buckets, the bucket name will be used # instead. # @option options [String] :password (nil) the password of the user. # @option options [true, false] :quiet (false) the flag controlling if raising # exception when the client executes operations on non-existent keys. If it # is +true+ it will raise {Couchbase::Error::NotFound} exceptions. The # default behaviour is to return +nil+ value silently (might be useful in # Rails cache). # @option options [Symbol] :environment (:production) the mode of the # connection. Currently it influences only on design documents set. If # the environment is +:development+, you will able to get design # documents with 'dev_' prefix, otherwise (in +:production+ mode) the # library will hide them from you. # @option options [String] :key_prefix (nil) the prefix string which will # be prepended to each key before sending out, and sripped before # returning back to the application. # @option options [Fixnum] :timeout (2500000) the timeout for IO # operations (in microseconds) # @option options [Fixnum, true] :default_arithmetic_init (0) the default # initial value for arithmetic operations. Setting this option to any # non positive number forces creation missing keys with given default # value. Setting it to +true+ will use zero as initial value. (see # {Bucket#incr} and {Bucket#decr}). # @option options [Symbol] :engine (:default) the IO engine to use # Currently following engines are supported: # :default :: Built-in engine (multi-thread friendly) # :libevent :: libevent IO plugin from libcouchbase (optional) # :libev :: libev IO plugin from libcouchbase (optional) # :eventmachine :: EventMachine plugin (builtin, but requires EM gem and ruby 1.9+) # @option options [true, false] :async (false) If true, the # connection instance will be considered always asynchronous and # IO interaction will be occured only when {Couchbase::Bucket#run} # called. See {Couchbase::Bucket#on_connect} to hook your code # after the instance will be connected. # # @example Initialize connection using default options # Couchbase.new # # @example Select custom bucket # Couchbase.new(:bucket => 'foo') # Couchbase.new('http://localhost:8091/pools/default/buckets/foo') # # @example Connect to protected bucket # Couchbase.new(:bucket => 'protected', :username => 'protected', :password => 'secret') # Couchbase.new('http://localhost:8091/pools/default/buckets/protected', # :username => 'protected', :password => 'secret') # # @example Use list of nodes, in case some nodes might be dead # Couchbase.new(:node_list => ['example.com:8091', 'example.org:8091', 'example.net']) # # @raise [Couchbase::Error::BucketNotFound] if there is no such bucket to # connect to # # @raise [Couchbase::Error::Connect] if the socket wasn't accessible # (doesn't accept connections or doesn't respond in time) # # @return [Bucket] # def initialize(url = nil, options = {}) default_options = { type: nil, quiet: false, hostname: 'localhost', port: 8091, pool: 'default', bucket: 'default', password: '', engine: nil, default_ttl: 0, async: false, default_arithmetic_init: 0, default_flags: 0, default_format: :document, default_observe_timeout: 2500000, on_error: nil, on_connect: nil, timeout: 0, environment: nil, key_prefix: nil, node_list: nil, destroying: 0, connected: 0, on_connect_proc: nil, async_disconnect_hook_set: 0, connected: false } url_options = if url.is_a? String raise ArgumentError.new unless url =~ /^http:\/\// uri = URI.new(url) { host: uri.host, port: uri.port, }.merge(path_to_pool_and_bucket(uri.path)) elsif url.nil? {} else url end connection_options = default_options.merge(options).merge(url_options) connection_options.each_pair do |key, value| instance_variable_set("@#{key}", value) end @transcoder = case @default_format when :document Transcoder::Document when :marshal Transcoder::Marshal when :plain Transcoder::Plain end connect unless async? end def quiet? !!quiet end def host hostname end def connect # TODO: doesn't work ObjectSpace.define_finalizer(self, -> conn { conn.disconnect }) uris = if @node_list Array(@node_list).map { |n| URI.new(n) } else Array(URI.new(base_url)) end begin @connection_factory = CouchbaseConnectionFactory.new(uris, bucket.to_java_string, password.to_java_string) @client = CouchbaseClient.new(@connection_factory) @connected = true rescue Java::ComCouchbaseClientVbucket::ConfigurationException #, #Java::Io::IOException raise Couchbase::Error::Auth.new rescue java.net.ConnectException => e raise Couchbase::Error::Connect.new end self end alias_method :reconnect, :connect def authority "#{hostname}:#{port}" end def base_url "http://#{authority}/pools" end def url "http://#{authority}/pools/#{pool}/buckets/#{bucket}/" end def connected? @connected end def disconnect if connected? @client.shutdown(3, TimeUnit::SECONDS) @client = nil @connection_factory = nil @connected = false else raise Couchbase::Error::Connect.new end end def on_connect(&block) @on_connect = block end def on_error(&block) @on_error = block end # Compare and swap value. # # @since 1.0.0 # # Reads a key's value from the server and yields it to a block. Replaces # the key's value with the result of the block as long as the key hasn't # been updated in the meantime, otherwise raises # {Couchbase::Error::KeyExists}. CAS stands for "compare and swap", and # avoids the need for manual key mutexing. Read more info here: # # In asynchronous mode it will yield result twice, first for # {Bucket#get} with {Result#operation} equal to +:get+ and # second time for {Bucket#set} with {Result#operation} equal to +:set+. # # @see http://couchbase.com/docs/memcached-api/memcached-api-protocol-text_cas.html # # @param [String, Symbol] key # # @param [Hash] options the options for "swap" part # @option options [Fixnum] :ttl (self.default_ttl) the time to live of this key # @option options [Symbol] :format (self.default_format) format of the value # @option options [Fixnum] :flags (self.default_flags) flags for this key # # @yieldparam [Object, Result] value old value in synchronous mode and # +Result+ object in asynchronous mode. # @yieldreturn [Object] new value. # # @raise [Couchbase::Error::KeyExists] if the key was updated before the the # code in block has been completed (the CAS value has been changed). # @raise [ArgumentError] if the block is missing for async mode # # @example Implement append to JSON encoded value # # c.default_format = :document # c.set("foo", {"bar" => 1}) # c.cas("foo") do |val| # val["baz"] = 2 # val # end # c.get("foo") #=> {"bar" => 1, "baz" => 2} # # @example Append JSON encoded value asynchronously # # c.default_format = :document # c.set("foo", {"bar" => 1}) # c.run do # c.cas("foo") do |val| # case val.operation # when :get # val["baz"] = 2 # val # when :set # # verify all is ok # puts "error: #{ret.error.inspect}" unless ret.success? # end # end # end # c.get("foo") #=> {"bar" => 1, "baz" => 2} # # @return [Fixnum] the CAS of new value def cas(key, options = {}) if async? block = Proc.new get(key) do |ret| val = block.call(ret) # get new value from caller set(ret.key, val, options.merge(:cas => ret.cas, :flags => ret.flags), &block) end else val, flags, ver = get(key, :extended => true) val = yield(val) # get new value from caller set(key, val, options.merge(:cas => ver, :flags => flags)) end end alias :compare_and_swap :cas # Fetch design docs stored in current bucket # # @since 1.2.0 # # @return [Hash] def design_docs req = make_http_request("/pools/default/buckets/#{bucket}/ddocs", :type => :management, :extended => true) docmap = {} req.on_body do |body| res = MultiJson.load(body.value) res["rows"].each do |obj| if obj['doc'] obj['doc']['value'] = obj['doc'].delete('json') end doc = DesignDoc.wrap(self, obj) key = doc.id.sub(/^_design\//, '') next if self.environment == :production && key =~ /dev_/ docmap[key] = doc end yield(docmap) if block_given? end req.continue async? ? nil : docmap end # Update or create design doc with supplied views # # @since 1.2.0 # # @param [Hash, IO, String] data The source object containing JSON # encoded design document. It must have +_id+ key set, this key # should start with +_design/+. # # @return [true, false] def save_design_doc(data) attrs = case data when String MultiJson.load(data) when IO MultiJson.load(data.read) when Hash data else raise ArgumentError, "Document should be Hash, String or IO instance" end rv = nil id = attrs.delete('_id').to_s attrs['language'] ||= 'javascript' if id !~ /\A_design\// rv = Result.new(:operation => :http_request, :key => id, :error => ArgumentError.new("'_id' key must be set and start with '_design/'.")) yield rv if block_given? raise rv.error unless async? end req = make_http_request(id, :body => MultiJson.dump(attrs), :method => :put, :extended => true) req.on_body do |res| rv = res val = MultiJson.load(res.value) if block_given? if res.success? && val['error'] res.error = Error::View.new("save_design_doc", val['error']) end yield(res) end end req.continue unless async? rv.success? or raise res.error end end # Delete design doc with given id and revision. # # @since 1.2.0 # # @param [String] id Design document id. It might have '_design/' # prefix. # # @param [String] rev Document revision. It uses latest revision if # +rev+ parameter is nil. # # @return [true, false] def delete_design_doc(id, rev = nil) ddoc = design_docs[id.sub(/^_design\//, '')] unless ddoc yield nil if block_given? return nil end path = Utils.build_query(ddoc.id, :rev => rev || ddoc.meta['rev']) req = make_http_request(path, :method => :delete, :extended => true) rv = nil req.on_body do |res| rv = res val = MultiJson.load(res.value) if block_given? if res.success? && val['error'] res.error = Error::View.new("delete_design_doc", val['error']) end yield(res) end end req.continue unless async? rv.success? or raise res.error end end # Delete contents of the bucket # # @see http://www.couchbase.com/docs/couchbase-manual-2.0/restapi-flushing-bucket.html # # @since 1.2.0.beta # # @yieldparam [Result] ret the object with +error+, +status+ and +operation+ # attributes. # # @raise [Couchbase::Error::Protocol] in case of an error is # encountered. Check {Couchbase::Error::Base#status} for detailed code. # # @return [true] always return true (see raise section) # # @example Simple flush the bucket # c.flush #=> true # # @example Asynchronous flush # c.run do # c.flush do |ret| # ret.operation #=> :flush # ret.success? #=> true # ret.status #=> 200 # end # end def flush if !async? && block_given? sync_block_error end req = make_http_request("/pools/default/buckets/#{bucket}/controller/doFlush", :type => :management, :method => :post, :extended => true) res = nil req.on_body do |r| res = r res.instance_variable_set("@operation", :flush) yield(res) if block_given? end req.continue true end # Create and register one-shot timer # # @return [Couchbase::Timer] def create_timer(interval, &block) Timer.new(self, interval, &block) end # Create and register periodic timer # # @return [Couchbase::Timer] def create_periodic_timer(interval, &block) Timer.new(self, interval, :periodic => true, &block) end # Wait for persistence condition # # @since 1.2.0.dp6 # # This operation is useful when some confidence needed regarding the # state of the keys. With two parameters +:replicated+ and +:persisted+ # it allows to set up the waiting rule. # # @param [String, Symbol, Array, Hash] keys The list of the keys to # observe. Full form is hash with key-cas value pairs, but there are # also shortcuts like just Array of keys or single key. CAS value # needed to when you need to ensure that the storage persisted exactly # the same version of the key you are asking to observe. # @param [Hash] options The options for operation # @option options [Fixnum] :timeout The timeout in microseconds # @option options [Fixnum] :replicated How many replicas should receive # the copy of the key. # @option options [Fixnum] :persisted How many nodes should store the # key on the disk. # # @raise [Couchbase::Error::Timeout] if the given time is up # # @return [Fixnum, Hash] will return CAS value just like # mutators or pairs key-cas in case of multiple keys. def observe_and_wait(*keys, &block) options = {:timeout => default_observe_timeout} options.update(keys.pop) if keys.size > 1 && keys.last.is_a?(Hash) verify_observe_options(options) if block && !async? raise ArgumentError, "synchronous mode doesn't support callbacks" end if keys.size == 0 raise ArgumentError, "at least one key is required" end if keys.size == 1 && keys[0].is_a?(Hash) key_cas = keys[0] else key_cas = keys.flatten.reduce({}) do |h, kk| h[kk] = nil # set CAS to nil h end end if async? do_observe_and_wait(key_cas, options, &block) else res = do_observe_and_wait(key_cas, options, &block) while res.nil? unless async? if keys.size == 1 && (keys[0].is_a?(String) || keys[0].is_a?(Symbol)) return res.values.first else return res end end end end private def path_to_pool_and_bucket(path) {} end def verify_observe_options(options) unless num_replicas raise Couchbase::Error::Libcouchbase, "cannot detect number of the replicas" end unless options[:persisted] || options[:replicated] raise ArgumentError, "either :persisted or :replicated option must be set" end if options[:persisted] && !(1..num_replicas + 1).include?(options[:persisted]) raise ArgumentError, "persisted number should be in range (1..#{num_replicas + 1})" end if options[:replicated] && !(1..num_replicas).include?(options[:replicated]) raise ArgumentError, "replicated number should be in range (1..#{num_replicas})" end end def do_observe_and_wait(keys, options, &block) acc = Hash.new do |h, k| h[k] = Hash.new(0) h[k][:cas] = [keys[k]] # first position is for master node h[k] end check_condition = lambda do ok = catch :break do acc.each do |key, stats| master = stats[:cas][0] if master.nil? # master node doesn't have the key throw :break end if options[:persisted] && (stats[:persisted] < options[:persisted] || stats[:cas].count(master) != options[:persisted]) throw :break end if options[:replicated] && (stats[:replicated] < options[:replicated] || stats[:cas].count(master) != options[:replicated] + 1) throw :break end end true end if ok if async? options[:timer].cancel if options[:timer] keys.each do |k, _| block.call(Result.new(:key => k, :cas => acc[k][:cas][0], :operation => :observe_and_wait)) end return :async else return keys.inject({}){|res, (k, _)| res[k] = acc[k][:cas][0]; res} end else options[:timeout] /= 2 if options[:timeout] > 0 if async? options[:timer] = create_timer(options[:timeout]) do do_observe_and_wait(keys, options, &block) end return :async else # do wait for timeout run { create_timer(options[:timeout]){} } # return nil to avoid recursive call return nil end else err = Couchbase::Error::Timeout.new("the observe request was timed out") err.instance_variable_set("@operation", :observe_and_wait) if async? keys.each do |k, _| block.call(Result.new(:key => k, :cas => acc[k][:cas][0], :operation => :observe_and_wait, :error => err)) end return :async else err.instance_variable_set("@key", keys.keys) raise err end end end end collect = lambda do |results| results.each do |res| if res.completed? check_condition.call if async? else if res.from_master? acc[res.key][:cas][0] = res.cas else acc[res.key][:cas] << res.cas end acc[res.key][res.status] += 1 if res.status == :persisted acc[res.key][:replicated] += 1 end end end end if async? observe(keys.keys, options, &collect) else observe(keys.keys, options).each{|_, v| collect.call(v)} check_condition.call end end end end