lib/aerospike/client.rb in aerospike-3.0.0 vs lib/aerospike/client.rb in aerospike-4.0.0
- old
+ new
@@ -34,19 +34,11 @@
#
# #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and
# +:fail_if_not_connected+ set to true
class Client
- attr_accessor :default_admin_policy
- attr_accessor :default_batch_policy
- attr_accessor :default_info_policy
- attr_accessor :default_query_policy
- attr_accessor :default_read_policy
- attr_accessor :default_scan_policy
- attr_accessor :default_write_policy
- attr_accessor :default_operate_policy
- attr_accessor :cluster
+ attr_accessor :default_admin_policy, :default_batch_policy, :default_info_policy, :default_query_policy, :default_read_policy, :default_scan_policy, :default_write_policy, :default_operate_policy, :cluster
def initialize(hosts = nil, policy: ClientPolicy.new, connect: true)
hosts = ::Aerospike::Host::Parse.(hosts || ENV["AEROSPIKE_HOSTS"] || "localhost")
policy = create_policy(policy, ClientPolicy)
set_default_policies(policy.policies)
@@ -228,15 +220,15 @@
if set_name && !set_name.to_s.strip.empty?
str_cmd = "truncate:namespace=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
else
- if node.supports_feature?(Aerospike::Features::TRUNCATE_NAMESPACE)
- str_cmd = "truncate-namespace:namespace=#{namespace}"
- else
- str_cmd = "truncate:namespace=#{namespace}"
- end
+ str_cmd = if node.supports_feature?(Aerospike::Features::TRUNCATE_NAMESPACE)
+ "truncate-namespace:namespace=#{namespace}"
+ else
+ "truncate:namespace=#{namespace}"
+ end
end
if before_last_update
lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round
str_cmd << ";lut=#{lut_nanos}"
@@ -327,19 +319,12 @@
when :none
info_flags |= INFO1_NOBINDATA
bin_names = nil
end
- if policy.use_batch_direct
- key_map = BatchItem.generate_map(keys)
- execute_batch_direct_commands(policy, keys) do |node, batch|
- BatchDirectCommand.new(node, batch, policy, key_map, bin_names, results, info_flags)
- end
- else
- execute_batch_index_commands(policy, keys) do |node, batch|
- BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags)
- end
+ execute_batch_index_commands(policy, keys) do |node, batch|
+ BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags)
end
results
end
@@ -349,26 +334,34 @@
# The policy can be used to specify timeouts and protocol type.
def batch_get_header(keys, options = nil)
batch_get(keys, :none, options)
end
+ # Operate on multiple records for specified batch keys in one batch call.
+ # This method allows different namespaces/bins for each key in the batch.
+ # The returned records are located in the same list.
+ #
+ # records can be BatchRead, BatchWrite, BatchDelete or BatchUDF.
+ #
+ # Requires server version 6.0+
+ def batch_operate(records, options = nil)
+ policy = create_policy(options, BatchPolicy, default_batch_policy)
+
+ execute_batch_operate_commands(policy, records) do |node, batch|
+ BatchOperateCommand.new(node, batch, policy, records)
+ end
+ end
+
# Check if multiple record keys exist in one batch call.
# The returned boolean array is in positional order with the original key array order.
# The policy can be used to specify timeouts and protocol type.
def batch_exists(keys, options = nil)
policy = create_policy(options, BatchPolicy, default_batch_policy)
results = Array.new(keys.length)
- if policy.use_batch_direct
- key_map = BatchItem.generate_map(keys)
- execute_batch_direct_commands(policy, keys) do |node, batch|
- BatchDirectExistsCommand.new(node, batch, policy, key_map, results)
- end
- else
- execute_batch_index_commands(policy, keys) do |node, batch|
- BatchIndexExistsCommand.new(node, batch, policy, results)
- end
+ execute_batch_index_commands(policy, keys) do |node, batch|
+ BatchIndexExistsCommand.new(node, batch, policy, results)
end
results
end
@@ -428,11 +421,11 @@
res[k] = v
end
end
if res["error"]
- raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res["error"]}\nFile: #{res["file"]}\nLine: #{res["line"]}\nMessage: #{res["message"]}")
+ raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}")
end
UdfRegisterTask.new(@cluster, server_path)
end
@@ -564,11 +557,12 @@
# index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later)
# collection_type should be :list, :mapkeys or :mapvalues
# ctx is an optional list of context. Supported on server v6.1+.
def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil, ctx: nil)
if options.nil? && collection_type.is_a?(Hash)
- options, collection_type = collection_type, nil
+ options = collection_type
+ collection_type = nil
end
policy = create_policy(options, Policy, default_info_policy)
str_cmd = "sindex-create:ns=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
@@ -941,17 +935,15 @@
when policy_klass
policy
when Hash
policy_klass.new(policy)
else
- fail TypeError, "policy should be a #{policy_klass.name} instance or a Hash"
+ raise TypeError, "policy should be a #{policy_klass.name} instance or a Hash"
end
end
- def cluster=(cluster)
- @cluster = cluster
- end
+ attr_writer :cluster
def cluster_config_changed(cluster)
Aerospike.logger.debug { "Cluster config change detected; active nodes: #{cluster.nodes.map(&:name)}" }
setup_command_validators
end
@@ -997,27 +989,21 @@
end
threads.each(&:join)
end
- def execute_batch_direct_commands(policy, keys)
+ def execute_batch_operate_commands(policy, records)
if @cluster.nodes.empty?
- raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Direct command failed because cluster is empty.")
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.")
end
- batch_nodes = BatchDirectNode.generate_list(@cluster, policy.replica, keys)
+ batch_nodes = BatchOperateNode.generate_list(@cluster, policy.replica, records)
threads = []
- # Use a thread per namespace per node
- batch_nodes.each do |batch_node|
- # copy to avoid race condition
- bn = batch_node
- bn.batch_namespaces.each do |batch|
- threads << Thread.new do
- Thread.current.abort_on_exception = true
- command = yield batch_node.node, batch
- execute_command(command)
- end
+ batch_nodes.each do |batch|
+ threads << Thread.new do
+ command = yield batch.node, batch
+ execute_command(command)
end
end
threads.each(&:join)
end