lib/aerospike/cluster/cluster.rb in aerospike-0.1.6 vs lib/aerospike/cluster/cluster.rb in aerospike-1.0.0

- old
+ new

@@ -23,11 +23,11 @@ private class Cluster - attr_reader :connection_timeout, :connection_queue_size + attr_reader :connection_timeout, :connection_queue_size, :user, :password def initialize(policy, *hosts) @cluster_seeds = hosts @connection_queue_size = policy.connection_queue_size @connection_timeout = policy.timeout @@ -36,10 +36,16 @@ @partition_write_map = {} @node_index = Atomic.new(0) @closed = Atomic.new(false) @mutex = Mutex.new + # setup auth info for cluster + if policy.requires_authentication + @user = policy.user + @password = AdminCommand.hash_password(policy.password) + end + wait_till_stablized if policy.fail_if_not_connected && !connected? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE) end @@ -86,18 +92,21 @@ # Returns a random node on the cluster def random_node # Must copy array reference for copy on write semantics to work. node_array = nodes length = node_array.length - for i in 0..length + i = 0 + while i < length # Must handle concurrency with other non-tending threads, so node_index is consistent. index = (@node_index.update{|v| v+1} % node_array.length).abs node = node_array[index] if node.active? return node end + + i = i.succ end raise Aerospike::Exceptions::InvalidNode.new end # Returns a list of all nodes in the cluster @@ -164,10 +173,15 @@ Info.request(conn, *commands).tap do node.put_connection(conn) end end + def change_password(user, password) + # change password ONLY if the user is the same + @password = password if @user == user + end + private def launch_tend_thread @tend_thread = Thread.new do abort_on_exception = false @@ -208,11 +222,11 @@ begin friends = node.refresh refresh_count += 1 friend_list.concat(friends) if friends rescue => e - Aerospike.logger.error("Node `#{node}` refresh failed: #{e.to_s}") + Aerospike.logger.error("Node `#{node}` refresh failed: #{e.backtrace.join("\n")}") end end end # Add nodes in a batch. @@ -281,13 +295,13 @@ list = [] seed_array.each do |seed| begin - seed_node_validator = NodeValidator.new(seed, @connection_timeout) + seed_node_validator = NodeValidator.new(self, seed, @connection_timeout) rescue => e - Aerospike.logger.error("Seed #{seed.to_s} failed: #{e}") + Aerospike.logger.error("Seed #{seed.to_s} failed: #{e.backtrace.join("\n")}") next end nv = nil # Seed host may have multiple aliases in the case of round-robin dns configurations. @@ -295,11 +309,11 @@ if aliass == seed nv = seed_node_validator else begin - nv = NodeValidator.new(aliass, @connection_timeout) + nv = NodeValidator.new(self, aliass, @connection_timeout) rescue Exection => e Aerospike.logger.error("Seed #{seed.to_s} failed: #{e}") next end end @@ -343,11 +357,11 @@ def find_nodes_to_add(hosts) list = [] hosts.each do |host| begin - nv = NodeValidator.new(host, @connection_timeout) + nv = NodeValidator.new(self, host, @connection_timeout) # if node is already in cluster's node list, # or already included in the list to be added, we should skip it node = find_node_by_name(nv.name) node ||= list.detect{|n| n.name == nv.name} @@ -430,15 +444,18 @@ partitions_list = partitions partitions_list.each do |node_array| max = node_array.length - for i in 0...max + i = 0 + while i < max node = node_array[i] # Use reference equality for performance. if node == filter return true end + + i = i.succ end end false end