lib/aerospike/client.rb in aerospike-2.19.0 vs lib/aerospike/client.rb in aerospike-2.20.0

- old
+ new

@@ -541,14 +541,15 @@ statement = statement.clone statement.set_aggregate_function(package_name, function_name, function_args, false) # Use a thread per node nodes.each do |node| + partitions = node.cluster.node_partitions(node, statement.namespace) Thread.new do Thread.current.abort_on_exception = true begin - command = QueryCommand.new(node, policy, statement, nil) + command = QueryCommand.new(node, policy, statement, nil, partitions) execute_command(command) rescue => e Aerospike.logger.error(e) raise e end @@ -642,13 +643,14 @@ recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan) if policy.concurrent_nodes # Use a thread per node nodes.each do |node| + partitions = node.cluster.node_partitions(node, namespace) Thread.new do Thread.current.abort_on_exception = true - command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) + command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions) begin execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) @@ -659,11 +661,12 @@ end else Thread.new do Thread.current.abort_on_exception = true nodes.each do |node| - command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) + partitions = node.cluster.node_partitions(node, namespace) + command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions) begin execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) @@ -692,13 +695,14 @@ node = @cluster.get_node_by_name(node) unless node.is_a?(Aerospike::Node) recordset = Recordset.new(policy.record_queue_size, 1, :scan) + partitions = node.cluster.node_partitions(node, namespace) Thread.new do Thread.current.abort_on_exception = true - command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) + command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions) begin execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) @@ -732,12 +736,13 @@ recordset = Recordset.new(policy.record_queue_size, nodes.length, :query) # Use a thread per node nodes.each do |node| + partitions = node.cluster.node_partitions(node, statement.namespace) Thread.new do Thread.current.abort_on_exception = true - command = QueryCommand.new(node, new_policy, statement, recordset) + command = QueryCommand.new(node, new_policy, statement, recordset, partitions) begin execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION recordset.cancel(e)