lib/aerospike/client.rb in aerospike-2.22.0 vs lib/aerospike/client.rb in aerospike-2.23.0

- old
+ new

@@ -622,97 +622,72 @@ #------------------------------------------------------- # Scan Operations #------------------------------------------------------- - def scan_all(namespace, set_name, bin_names = nil, options = nil) + # Reads records in specified namespace and set using partition filter. + # If the policy's concurrent_nodes is specified, each server node will be read in + # parallel. Otherwise, server nodes are read sequentially. + # If partition_filter is nil, all partitions will be scanned. + # If the policy is nil, the default relevant policy will be used. + # This method is only supported by Aerospike 4.9+ servers. + def scan_partitions(partition_filter, namespace, set_name, bin_names = nil, options = nil) policy = create_policy(options, ScanPolicy, default_scan_policy) - # wait until all migrations are finished - # TODO: implement - # @cluster.WaitUntillMigrationIsFinished(policy.timeout) - # Retry policy must be one-shot for scans. # copy on write for policy new_policy = policy.clone nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") end - recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan) - - if policy.concurrent_nodes - # Use a thread per node - nodes.each do |node| - partitions = node.cluster.node_partitions(node, namespace) - Thread.new do - Thread.current.abort_on_exception = true - command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions) - begin - execute_command(command) - rescue => e - Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION - recordset.cancel(e) - ensure - recordset.thread_finished - end - end - end - else - Thread.new do - Thread.current.abort_on_exception = true - nodes.each do |node| - partitions = node.cluster.node_partitions(node, namespace) - command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions) - begin - execute_command(command) - rescue => e - Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION - recordset.cancel(e) - ensure - recordset.thread_finished - end - end - end + tracker = Aerospike::PartitionTracker.new(policy, nodes, partition_filter) + recordset = Recordset.new(policy.record_queue_size, 1, :scan) + Thread.new do + Thread.current.abort_on_exception = true + ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names) end recordset end - # ScanNode reads all records in specified namespace and set, from one node only. - # The policy can be used to specify timeouts. - def scan_node(node, namespace, set_name, bin_names = nil, options = nil) + # Reads all records in specified namespace and set for one node only. + # If the policy is nil, the default relevant policy will be used. + def scan_node_partitions(node, namespace, set_name, bin_names = nil, options = nil) policy = create_policy(options, ScanPolicy, default_scan_policy) - # wait until all migrations are finished - # TODO: implement - # @cluster.WaitUntillMigrationIsFinished(policy.timeout) # Retry policy must be one-shot for scans. # copy on write for policy new_policy = policy.clone - new_policy.max_retries = 0 - node = @cluster.get_node_by_name(node) unless node.is_a?(Aerospike::Node) + unless node.active? + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") + end + tracker = Aerospike::PartitionTracker.new(policy, [node]) recordset = Recordset.new(policy.record_queue_size, 1, :scan) - - partitions = node.cluster.node_partitions(node, namespace) Thread.new do Thread.current.abort_on_exception = true - command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset, partitions) - begin - execute_command(command) - rescue => e - Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION - recordset.cancel(e) - ensure - recordset.thread_finished - end + ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names) end recordset + end + + # Reads all records in specified namespace and set from all nodes. + # If the policy's concurrent_nodes is specified, each server node will be read in + # parallel. Otherwise, server nodes are read sequentially. + # If the policy is nil, the default relevant policy will be used. + def scan_all(namespace, set_name, bin_names = nil, options = nil) + scan_partitions(Aerospike::PartitionFilter.all, namespace, set_name, bin_names, options) + end + + # ScanNode reads all records in specified namespace and set, from one node only. + # The policy can be used to specify timeouts. + def scan_node(node, namespace, set_name, bin_names = nil, options = nil) + scan_node_partitions(node, namespace, set_name, bin_names, options) end #-------------------------------------------------------- # Query functions (Supported by Aerospike 3 servers only) #--------------------------------------------------------