lib/aerospike/cluster/cluster.rb in aerospike-0.1.6 vs lib/aerospike/cluster/cluster.rb in aerospike-1.0.0
- old
+ new
@@ -23,11 +23,11 @@
private
class Cluster
- attr_reader :connection_timeout, :connection_queue_size
+ attr_reader :connection_timeout, :connection_queue_size, :user, :password
def initialize(policy, *hosts)
@cluster_seeds = hosts
@connection_queue_size = policy.connection_queue_size
@connection_timeout = policy.timeout
@@ -36,10 +36,16 @@
@partition_write_map = {}
@node_index = Atomic.new(0)
@closed = Atomic.new(false)
@mutex = Mutex.new
+ # setup auth info for cluster
+ if policy.requires_authentication
+ @user = policy.user
+ @password = AdminCommand.hash_password(policy.password)
+ end
+
wait_till_stablized
if policy.fail_if_not_connected && !connected?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE)
end
@@ -86,18 +92,21 @@
# Returns a random node on the cluster
def random_node
# Must copy array reference for copy on write semantics to work.
node_array = nodes
length = node_array.length
- for i in 0..length
+ i = 0
+ while i < length
# Must handle concurrency with other non-tending threads, so node_index is consistent.
index = (@node_index.update{|v| v+1} % node_array.length).abs
node = node_array[index]
if node.active?
return node
end
+
+ i = i.succ
end
raise Aerospike::Exceptions::InvalidNode.new
end
# Returns a list of all nodes in the cluster
@@ -164,10 +173,15 @@
Info.request(conn, *commands).tap do
node.put_connection(conn)
end
end
+ def change_password(user, password)
+ # change password ONLY if the user is the same
+ @password = password if @user == user
+ end
+
private
def launch_tend_thread
@tend_thread = Thread.new do
abort_on_exception = false
@@ -208,11 +222,11 @@
begin
friends = node.refresh
refresh_count += 1
friend_list.concat(friends) if friends
rescue => e
- Aerospike.logger.error("Node `#{node}` refresh failed: #{e.to_s}")
+ Aerospike.logger.error("Node `#{node}` refresh failed: #{e.backtrace.join("\n")}")
end
end
end
# Add nodes in a batch.
@@ -281,13 +295,13 @@
list = []
seed_array.each do |seed|
begin
- seed_node_validator = NodeValidator.new(seed, @connection_timeout)
+ seed_node_validator = NodeValidator.new(self, seed, @connection_timeout)
rescue => e
- Aerospike.logger.error("Seed #{seed.to_s} failed: #{e}")
+ Aerospike.logger.error("Seed #{seed.to_s} failed: #{e.backtrace.join("\n")}")
next
end
nv = nil
# Seed host may have multiple aliases in the case of round-robin dns configurations.
@@ -295,11 +309,11 @@
if aliass == seed
nv = seed_node_validator
else
begin
- nv = NodeValidator.new(aliass, @connection_timeout)
+ nv = NodeValidator.new(self, aliass, @connection_timeout)
rescue Exection => e
Aerospike.logger.error("Seed #{seed.to_s} failed: #{e}")
next
end
end
@@ -343,11 +357,11 @@
def find_nodes_to_add(hosts)
list = []
hosts.each do |host|
begin
- nv = NodeValidator.new(host, @connection_timeout)
+ nv = NodeValidator.new(self, host, @connection_timeout)
# if node is already in cluster's node list,
# or already included in the list to be added, we should skip it
node = find_node_by_name(nv.name)
node ||= list.detect{|n| n.name == nv.name}
@@ -430,15 +444,18 @@
partitions_list = partitions
partitions_list.each do |node_array|
max = node_array.length
- for i in 0...max
+ i = 0
+ while i < max
node = node_array[i]
# Use reference equality for performance.
if node == filter
return true
end
+
+ i = i.succ
end
end
false
end