lib/aerospike/client.rb in aerospike-2.13.0 vs lib/aerospike/client.rb in aerospike-2.14.0
- old
+ new
@@ -42,10 +42,11 @@
attr_accessor :default_info_policy
attr_accessor :default_query_policy
attr_accessor :default_read_policy
attr_accessor :default_scan_policy
attr_accessor :default_write_policy
+ attr_accessor :cluster
def initialize(hosts = nil, policy: ClientPolicy.new, connect: true)
hosts = ::Aerospike::Host::Parse.(hosts || ENV['AEROSPIKE_HOSTS'] || 'localhost')
policy = create_policy(policy, ClientPolicy)
@@ -222,23 +223,33 @@
# If no policy options are provided, +@default_info_policy+ will be used.
def truncate(namespace, set_name = nil, before_last_update = nil, options = {})
policy = create_policy(options, Policy, default_info_policy)
- str_cmd = "truncate:namespace=#{namespace}"
- str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
+ node = @cluster.random_node
+ conn = node.get_connection(policy.timeout)
+ if set_name && !set_name.to_s.strip.empty?
+ str_cmd = "truncate:namespace=#{namespace}"
+ str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
+ else
+ if node.supports_feature(Aerospike::Features::TRUNCATE_NAMESPACE)
+ str_cmd = "truncate-namespace:namespace=#{namespace}"
+ else
+ str_cmd = "truncate:namespace=#{namespace}"
+ end
+ end
+
if before_last_update
lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round
str_cmd << ";lut=#{lut_nanos}"
elsif supports_feature?(Aerospike::Features::LUT_NOW)
# Servers >= 4.3.1.4 require lut argument
str_cmd << ";lut=now"
end
- # Send index command to one node. That node will distribute the command to other nodes.
- response = send_info_command(policy, str_cmd).upcase
+ response = send_info_command(policy, str_cmd, node).upcase
return if response == 'OK'
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}")
end
#-------------------------------------------------------
@@ -816,12 +827,16 @@
self.default_query_policy = create_policy(policies[:query], QueryPolicy)
self.default_scan_policy = create_policy(policies[:scan], ScanPolicy)
self.default_write_policy = create_policy(policies[:write], WritePolicy)
end
- def send_info_command(policy, command)
+ def send_info_command(policy, command, node = nil)
Aerospike.logger.debug { "Sending info command: #{command}" }
- _, response = @cluster.request_info(policy, command).first
+ if node
+ _, response = @cluster.request_node_info(node, policy, command).first
+ else
+ _, response = @cluster.request_info(policy, command).first
+ end
response.to_s
end
def hash_to_bins(hash)
if hash.is_a?(Bin)