lib/aerospike/client.rb in aerospike-2.23.0 vs lib/aerospike/client.rb in aerospike-2.24.0

- old
+ new

@@ -691,45 +691,45 @@ #-------------------------------------------------------- # Query functions (Supported by Aerospike 3 servers only) #-------------------------------------------------------- - # Query executes a query and returns a recordset. - # The query executor puts records on a channel from separate goroutines. - # The caller can concurrently pops records off the channel through the - # record channel. + # Executes a query for specified partitions and returns a recordset. + # The query executor puts records on the queue from separate threads. + # The caller can concurrently pop records off the queue through the + # recordset.records API. # - # This method is only supported by Aerospike 3 servers. - # If the policy is nil, a default policy will be generated. - def query(statement, options = nil) + # This method is only supported by Aerospike 4.9+ servers. + # If the policy is nil, the default relevant policy will be used. + def query_partitions(partition_filter, statement, options = nil) policy = create_policy(options, QueryPolicy, default_query_policy) new_policy = policy.clone nodes = @cluster.nodes if nodes.empty? - raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.") end - recordset = Recordset.new(policy.record_queue_size, nodes.length, :query) - - # Use a thread per node - nodes.each do |node| - partitions = node.cluster.node_partitions(node, statement.namespace) - Thread.new do - Thread.current.abort_on_exception = true - command = QueryCommand.new(node, new_policy, statement, recordset, partitions) - begin - execute_command(command) - rescue => e - Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION - recordset.cancel(e) - ensure - recordset.thread_finished - end - end + # result recordset + recordset = Recordset.new(policy.record_queue_size, 1, :query) + tracker = PartitionTracker.new(policy, nodes, partition_filter) + Thread.new do + Thread.current.abort_on_exception = true + QueryExecutor.query_partitions(@cluster, policy, tracker, statement, recordset) end recordset + end + + # Query executes a query and returns a recordset. + # The query executor puts records on a channel from separate threads. + # The caller can concurrently pops records off the channel through the + # record channel. + # + # This method is only supported by Aerospike 3 servers. + # If the policy is nil, a default policy will be generated. + def query(statement, options = nil) + query_partitions(Aerospike::PartitionFilter.all, statement, options) end #------------------------------------------------------- # User administration #-------------------------------------------------------