lib/aerospike/client.rb in aerospike-2.19.0 vs lib/aerospike/client.rb in aerospike-2.20.0
- old
+ new
@@ -541,14 +541,15 @@
statement = statement.clone
statement.set_aggregate_function(package_name, function_name, function_args, false)
# Use a thread per node
nodes.each do |node|
+ partitions = node.cluster.node_partitions(node, statement.namespace)
Thread.new do
Thread.current.abort_on_exception = true
begin
- command = QueryCommand.new(node, policy, statement, nil)
+ command = QueryCommand.new(node, policy, statement, nil, partitions)
execute_command(command)
rescue => e
Aerospike.logger.error(e)
raise e
end
@@ -642,13 +643,14 @@
recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan)
if policy.concurrent_nodes
# Use a thread per node
nodes.each do |node|
+ partitions = node.cluster.node_partitions(node, namespace)
Thread.new do
Thread.current.abort_on_exception = true
- command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
+ command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions)
begin
execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
@@ -659,11 +661,12 @@
end
else
Thread.new do
Thread.current.abort_on_exception = true
nodes.each do |node|
- command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
+ partitions = node.cluster.node_partitions(node, namespace)
+ command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions)
begin
execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
@@ -692,13 +695,14 @@
node = @cluster.get_node_by_name(node) unless node.is_a?(Aerospike::Node)
recordset = Recordset.new(policy.record_queue_size, 1, :scan)
+ partitions = node.cluster.node_partitions(node, namespace)
Thread.new do
Thread.current.abort_on_exception = true
- command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
+ command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions)
begin
execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
@@ -732,12 +736,13 @@
recordset = Recordset.new(policy.record_queue_size, nodes.length, :query)
# Use a thread per node
nodes.each do |node|
+ partitions = node.cluster.node_partitions(node, statement.namespace)
Thread.new do
Thread.current.abort_on_exception = true
- command = QueryCommand.new(node, new_policy, statement, recordset)
+ command = QueryCommand.new(node, new_policy, statement, recordset, partitions)
begin
execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
recordset.cancel(e)