lib/aerospike/client.rb in aerospike-2.22.0 vs lib/aerospike/client.rb in aerospike-2.23.0
- old
+ new
@@ -622,97 +622,72 @@
#-------------------------------------------------------
# Scan Operations
#-------------------------------------------------------
- def scan_all(namespace, set_name, bin_names = nil, options = nil)
+ # Reads records in specified namespace and set using partition filter.
+ # If the policy's concurrent_nodes is specified, each server node will be read in
+ # parallel. Otherwise, server nodes are read sequentially.
+ # If partition_filter is nil, all partitions will be scanned.
+ # If the policy is nil, the default relevant policy will be used.
+ # This method is only supported by Aerospike 4.9+ servers.
+ def scan_partitions(partition_filter, namespace, set_name, bin_names = nil, options = nil)
policy = create_policy(options, ScanPolicy, default_scan_policy)
- # wait until all migrations are finished
- # TODO: implement
- # @cluster.WaitUntillMigrationIsFinished(policy.timeout)
-
# Retry policy must be one-shot for scans.
# copy on write for policy
new_policy = policy.clone
nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
end
- recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan)
-
- if policy.concurrent_nodes
- # Use a thread per node
- nodes.each do |node|
- partitions = node.cluster.node_partitions(node, namespace)
- Thread.new do
- Thread.current.abort_on_exception = true
- command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions)
- begin
- execute_command(command)
- rescue => e
- Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
- recordset.cancel(e)
- ensure
- recordset.thread_finished
- end
- end
- end
- else
- Thread.new do
- Thread.current.abort_on_exception = true
- nodes.each do |node|
- partitions = node.cluster.node_partitions(node, namespace)
- command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions)
- begin
- execute_command(command)
- rescue => e
- Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
- recordset.cancel(e)
- ensure
- recordset.thread_finished
- end
- end
- end
+ tracker = Aerospike::PartitionTracker.new(policy, nodes, partition_filter)
+ recordset = Recordset.new(policy.record_queue_size, 1, :scan)
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names)
end
recordset
end
- # ScanNode reads all records in specified namespace and set, from one node only.
- # The policy can be used to specify timeouts.
- def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
+ # Reads all records in specified namespace and set for one node only.
+ # If the policy is nil, the default relevant policy will be used.
+ def scan_node_partitions(node, namespace, set_name, bin_names = nil, options = nil)
policy = create_policy(options, ScanPolicy, default_scan_policy)
- # wait until all migrations are finished
- # TODO: implement
- # @cluster.WaitUntillMigrationIsFinished(policy.timeout)
# Retry policy must be one-shot for scans.
# copy on write for policy
new_policy = policy.clone
- new_policy.max_retries = 0
- node = @cluster.get_node_by_name(node) unless node.is_a?(Aerospike::Node)
+ unless node.active?
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
+ end
+ tracker = Aerospike::PartitionTracker.new(policy, [node])
recordset = Recordset.new(policy.record_queue_size, 1, :scan)
-
- partitions = node.cluster.node_partitions(node, namespace)
Thread.new do
Thread.current.abort_on_exception = true
- command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions)
- begin
- execute_command(command)
- rescue => e
- Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
- recordset.cancel(e)
- ensure
- recordset.thread_finished
- end
+ ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names)
end
recordset
+ end
+
+ # Reads all records in specified namespace and set from all nodes.
+ # If the policy's concurrent_nodes is specified, each server node will be read in
+ # parallel. Otherwise, server nodes are read sequentially.
+ # If the policy is nil, the default relevant policy will be used.
+ def scan_all(namespace, set_name, bin_names = nil, options = nil)
+ scan_partitions(Aerospike::PartitionFilter.all, namespace, set_name, bin_names, options)
+ end
+
+ # ScanNode reads all records in specified namespace and set, from one node only.
+ # The policy can be used to specify timeouts.
+ def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
+ scan_node_partitions(node, namespace, set_name, bin_names, options)
end
#--------------------------------------------------------
# Query functions (Supported by Aerospike 3 servers only)
#--------------------------------------------------------