lib/aerospike/cluster/cluster.rb in aerospike-1.0.10 vs lib/aerospike/cluster/cluster.rb in aerospike-1.0.11

- old
+ new

@@ -12,10 +12,11 @@ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. +require 'set' require 'thread' require 'timeout' require 'aerospike/atomic/atomic' @@ -24,31 +25,42 @@ private class Cluster attr_reader :connection_timeout, :connection_queue_size, :user, :password + attr_reader :features def initialize(policy, *hosts) @cluster_seeds = hosts + @fail_if_not_connected = policy.fail_if_not_connected @connection_queue_size = policy.connection_queue_size @connection_timeout = policy.timeout + @tend_interval = policy.tend_interval @aliases = {} @cluster_nodes = [] @partition_write_map = {} @node_index = Atomic.new(0) + @features = Atomic.new(Set.new) @closed = Atomic.new(true) @mutex = Mutex.new + @cluster_config_change_listeners = Atomic.new([]) + @old_node_cound = 0 + # setup auth info for cluster if policy.requires_authentication @user = policy.user @password = AdminCommand.hash_password(policy.password) end + self + end + + def connect wait_till_stablized - if policy.fail_if_not_connected && !connected? + if @fail_if_not_connected && !connected? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE) end launch_tend_thread @@ -173,39 +185,57 @@ Info.request(conn, *commands).tap do node.put_connection(conn) end end + def supports_feature?(feature) + @features.get.include?(feature.to_s) + end + def change_password(user, password) # change password ONLY if the user is the same @password = password if @user == user end + def add_cluster_config_change_listener(listener) + @cluster_config_change_listeners.update do |listeners| + listeners.push(listener) + end + end + + def remove_cluster_config_change_listener(listener) + @cluster_config_change_listeners.update do |listeners| + listeners.delete(listener) + end + end + private def launch_tend_thread @tend_thread = Thread.new do abort_on_exception = false while true begin tend - sleep 1 # 1 second + sleep(@tend_interval / 1000.0) rescue => e Aerospike.logger.error("Exception occured during tend: #{e}") end end end end def tend nodes = self.nodes + cluster_config_changed = false # All node additions/deletions are performed in tend thread. # If active nodes don't exist, seed cluster. if nodes.empty? Aerospike.logger.info("No connections available; seeding...") seed_nodes + cluster_config_changed = true # refresh nodes list after seeding nodes = self.nodes end @@ -229,18 +259,37 @@ end end # Add nodes in a batch. add_list = find_nodes_to_add(friend_list) - add_nodes(add_list) unless add_list.empty? + unless add_list.empty? + add_nodes(add_list) + cluster_config_changed = true + end # Handle nodes changes determined from refreshes. # Remove nodes in a batch. remove_list = find_nodes_to_remove(refresh_count) - remove_nodes(remove_list) unless remove_list.empty? + unless remove_list.empty? + remove_nodes(remove_list) + cluster_config_changed = true + end - Aerospike.logger.info("Tend finished. Live node count: #{nodes.length} #{nodes}") + if cluster_config_changed + update_cluster_features + + # only log the tend finish IF the number of nodes has been changed. + # This prevents spamming the log on every tend interval + if @old_node_cound > nodes.length + Aerospike.logger.info("Tend finished. #{@old_node_cound - nodes.length} nodes have left the cluster. Old node count: #{@old_node_cound}, New node count #{nodes.length} #{nodes}") + else + Aerospike.logger.info("Tend finished. #{nodes.length - @old_node_cound} nodes have joined the cluster. Old node count: #{@old_node_cound}, New node count #{nodes.length} #{nodes}") + end + @old_node_cound = nodes.length + + notify_cluster_config_changed + end end def wait_till_stablized count = -1 @@ -261,18 +310,33 @@ end end # wait for the thread to finish or timeout begin - Timeout.timeout(1) do + Timeout.timeout(@connection_timeout) do thr.join end rescue Timeout::Error thr.kill if thr.alive? end @closed.value = false if @cluster_nodes.length > 0 + end + + def update_cluster_features + # Cluster supports features that are supported by all nodes + @features.update do |cluster_features| + node_featues = self.nodes.map(&:features) + cluster_features.replace(node_featues.reduce(&:intersection)) + end + end + + def notify_cluster_config_changed + listeners = @cluster_config_change_listeners.get + listeners.each do |listener| + listener.send(:cluster_config_changed, self) + end end def set_partitions(part_map) @mutex.synchronize do @partition_write_map = part_map