lib/aerospike/client.rb in aerospike-2.23.0 vs lib/aerospike/client.rb in aerospike-2.24.0
- old
+ new
@@ -691,45 +691,45 @@
#--------------------------------------------------------
# Query functions (Supported by Aerospike 3 servers only)
#--------------------------------------------------------
- # Query executes a query and returns a recordset.
- # The query executor puts records on a channel from separate goroutines.
- # The caller can concurrently pops records off the channel through the
- # record channel.
+ # Executes a query for specified partitions and returns a recordset.
+ # The query executor puts records on the queue from separate threads.
+ # The caller can concurrently pop records off the queue through the
+ # recordset.records API.
#
- # This method is only supported by Aerospike 3 servers.
- # If the policy is nil, a default policy will be generated.
- def query(statement, options = nil)
+ # This method is only supported by Aerospike 4.9+ servers.
+ # If the policy is nil, the default relevant policy will be used.
+ def query_partitions(partition_filter, statement, options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
new_policy = policy.clone
nodes = @cluster.nodes
if nodes.empty?
- raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.")
end
- recordset = Recordset.new(policy.record_queue_size, nodes.length, :query)
-
- # Use a thread per node
- nodes.each do |node|
- partitions = node.cluster.node_partitions(node, statement.namespace)
- Thread.new do
- Thread.current.abort_on_exception = true
- command = QueryCommand.new(node, new_policy, statement, recordset, partitions)
- begin
- execute_command(command)
- rescue => e
- Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
- recordset.cancel(e)
- ensure
- recordset.thread_finished
- end
- end
+ # result recordset
+ recordset = Recordset.new(policy.record_queue_size, 1, :query)
+ tracker = PartitionTracker.new(policy, nodes, partition_filter)
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ QueryExecutor.query_partitions(@cluster, policy, tracker, statement, recordset)
end
recordset
+ end
+
+ # Query executes a query and returns a recordset.
+ # The query executor puts records on a channel from separate threads.
+ # The caller can concurrently pops records off the channel through the
+ # record channel.
+ #
+ # This method is only supported by Aerospike 3 servers.
+ # If the policy is nil, a default policy will be generated.
+ def query(statement, options = nil)
+ query_partitions(Aerospike::PartitionFilter.all, statement, options)
end
#-------------------------------------------------------
# User administration
#-------------------------------------------------------