lib/aerospike/client.rb in aerospike-3.0.0 vs lib/aerospike/client.rb in aerospike-4.0.0

- old
+ new

@@ -34,19 +34,11 @@ # # #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and # +:fail_if_not_connected+ set to true class Client - attr_accessor :default_admin_policy - attr_accessor :default_batch_policy - attr_accessor :default_info_policy - attr_accessor :default_query_policy - attr_accessor :default_read_policy - attr_accessor :default_scan_policy - attr_accessor :default_write_policy - attr_accessor :default_operate_policy - attr_accessor :cluster + attr_accessor :default_admin_policy, :default_batch_policy, :default_info_policy, :default_query_policy, :default_read_policy, :default_scan_policy, :default_write_policy, :default_operate_policy, :cluster def initialize(hosts = nil, policy: ClientPolicy.new, connect: true) hosts = ::Aerospike::Host::Parse.(hosts || ENV["AEROSPIKE_HOSTS"] || "localhost") policy = create_policy(policy, ClientPolicy) set_default_policies(policy.policies) @@ -228,15 +220,15 @@ if set_name && !set_name.to_s.strip.empty? str_cmd = "truncate:namespace=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? else - if node.supports_feature?(Aerospike::Features::TRUNCATE_NAMESPACE) - str_cmd = "truncate-namespace:namespace=#{namespace}" - else - str_cmd = "truncate:namespace=#{namespace}" - end + str_cmd = if node.supports_feature?(Aerospike::Features::TRUNCATE_NAMESPACE) + "truncate-namespace:namespace=#{namespace}" + else + "truncate:namespace=#{namespace}" + end end if before_last_update lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round str_cmd << ";lut=#{lut_nanos}" @@ -327,19 +319,12 @@ when :none info_flags |= INFO1_NOBINDATA bin_names = nil end - if policy.use_batch_direct - key_map = BatchItem.generate_map(keys) - execute_batch_direct_commands(policy, keys) do |node, batch| - BatchDirectCommand.new(node, batch, policy, key_map, bin_names, results, info_flags) - end - else - execute_batch_index_commands(policy, keys) do |node, batch| - BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags) - end + execute_batch_index_commands(policy, keys) do |node, batch| + BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags) end results end @@ -349,26 +334,34 @@ # The policy can be used to specify timeouts and protocol type. def batch_get_header(keys, options = nil) batch_get(keys, :none, options) end + # Operate on multiple records for specified batch keys in one batch call. + # This method allows different namespaces/bins for each key in the batch. + # The returned records are located in the same list. + # + # records can be BatchRead, BatchWrite, BatchDelete or BatchUDF. + # + # Requires server version 6.0+ + def batch_operate(records, options = nil) + policy = create_policy(options, BatchPolicy, default_batch_policy) + + execute_batch_operate_commands(policy, records) do |node, batch| + BatchOperateCommand.new(node, batch, policy, records) + end + end + # Check if multiple record keys exist in one batch call. # The returned boolean array is in positional order with the original key array order. # The policy can be used to specify timeouts and protocol type. def batch_exists(keys, options = nil) policy = create_policy(options, BatchPolicy, default_batch_policy) results = Array.new(keys.length) - if policy.use_batch_direct - key_map = BatchItem.generate_map(keys) - execute_batch_direct_commands(policy, keys) do |node, batch| - BatchDirectExistsCommand.new(node, batch, policy, key_map, results) - end - else - execute_batch_index_commands(policy, keys) do |node, batch| - BatchIndexExistsCommand.new(node, batch, policy, results) - end + execute_batch_index_commands(policy, keys) do |node, batch| + BatchIndexExistsCommand.new(node, batch, policy, results) end results end @@ -428,11 +421,11 @@ res[k] = v end end if res["error"] - raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res["error"]}\nFile: #{res["file"]}\nLine: #{res["line"]}\nMessage: #{res["message"]}") + raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}") end UdfRegisterTask.new(@cluster, server_path) end @@ -564,11 +557,12 @@ # index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later) # collection_type should be :list, :mapkeys or :mapvalues # ctx is an optional list of context. Supported on server v6.1+. def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil, ctx: nil) if options.nil? && collection_type.is_a?(Hash) - options, collection_type = collection_type, nil + options = collection_type + collection_type = nil end policy = create_policy(options, Policy, default_info_policy) str_cmd = "sindex-create:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? @@ -941,17 +935,15 @@ when policy_klass policy when Hash policy_klass.new(policy) else - fail TypeError, "policy should be a #{policy_klass.name} instance or a Hash" + raise TypeError, "policy should be a #{policy_klass.name} instance or a Hash" end end - def cluster=(cluster) - @cluster = cluster - end + attr_writer :cluster def cluster_config_changed(cluster) Aerospike.logger.debug { "Cluster config change detected; active nodes: #{cluster.nodes.map(&:name)}" } setup_command_validators end @@ -997,27 +989,21 @@ end threads.each(&:join) end - def execute_batch_direct_commands(policy, keys) + def execute_batch_operate_commands(policy, records) if @cluster.nodes.empty? - raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Direct command failed because cluster is empty.") + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.") end - batch_nodes = BatchDirectNode.generate_list(@cluster, policy.replica, keys) + batch_nodes = BatchOperateNode.generate_list(@cluster, policy.replica, records) threads = [] - # Use a thread per namespace per node - batch_nodes.each do |batch_node| - # copy to avoid race condition - bn = batch_node - bn.batch_namespaces.each do |batch| - threads << Thread.new do - Thread.current.abort_on_exception = true - command = yield batch_node.node, batch - execute_command(command) - end + batch_nodes.each do |batch| + threads << Thread.new do + command = yield batch.node, batch + execute_command(command) end end threads.each(&:join) end