lib/aerospike/cluster/cluster.rb in aerospike-1.0.10 vs lib/aerospike/cluster/cluster.rb in aerospike-1.0.11
- old
+ new
@@ -12,10 +12,11 @@
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
+require 'set'
require 'thread'
require 'timeout'
require 'aerospike/atomic/atomic'
@@ -24,31 +25,42 @@
private
class Cluster
attr_reader :connection_timeout, :connection_queue_size, :user, :password
+ attr_reader :features
def initialize(policy, *hosts)
@cluster_seeds = hosts
+ @fail_if_not_connected = policy.fail_if_not_connected
@connection_queue_size = policy.connection_queue_size
@connection_timeout = policy.timeout
+ @tend_interval = policy.tend_interval
@aliases = {}
@cluster_nodes = []
@partition_write_map = {}
@node_index = Atomic.new(0)
+ @features = Atomic.new(Set.new)
@closed = Atomic.new(true)
@mutex = Mutex.new
+ @cluster_config_change_listeners = Atomic.new([])
+ @old_node_cound = 0
+
# setup auth info for cluster
if policy.requires_authentication
@user = policy.user
@password = AdminCommand.hash_password(policy.password)
end
+ self
+ end
+
+ def connect
wait_till_stablized
- if policy.fail_if_not_connected && !connected?
+ if @fail_if_not_connected && !connected?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE)
end
launch_tend_thread
@@ -173,39 +185,57 @@
Info.request(conn, *commands).tap do
node.put_connection(conn)
end
end
+ def supports_feature?(feature)
+ @features.get.include?(feature.to_s)
+ end
+
def change_password(user, password)
# change password ONLY if the user is the same
@password = password if @user == user
end
+ def add_cluster_config_change_listener(listener)
+ @cluster_config_change_listeners.update do |listeners|
+ listeners.push(listener)
+ end
+ end
+
+ def remove_cluster_config_change_listener(listener)
+ @cluster_config_change_listeners.update do |listeners|
+ listeners.delete(listener)
+ end
+ end
+
private
def launch_tend_thread
@tend_thread = Thread.new do
abort_on_exception = false
while true
begin
tend
- sleep 1 # 1 second
+ sleep(@tend_interval / 1000.0)
rescue => e
Aerospike.logger.error("Exception occured during tend: #{e}")
end
end
end
end
def tend
nodes = self.nodes
+ cluster_config_changed = false
# All node additions/deletions are performed in tend thread.
# If active nodes don't exist, seed cluster.
if nodes.empty?
Aerospike.logger.info("No connections available; seeding...")
seed_nodes
+ cluster_config_changed = true
# refresh nodes list after seeding
nodes = self.nodes
end
@@ -229,18 +259,37 @@
end
end
# Add nodes in a batch.
add_list = find_nodes_to_add(friend_list)
- add_nodes(add_list) unless add_list.empty?
+ unless add_list.empty?
+ add_nodes(add_list)
+ cluster_config_changed = true
+ end
# Handle nodes changes determined from refreshes.
# Remove nodes in a batch.
remove_list = find_nodes_to_remove(refresh_count)
- remove_nodes(remove_list) unless remove_list.empty?
+ unless remove_list.empty?
+ remove_nodes(remove_list)
+ cluster_config_changed = true
+ end
- Aerospike.logger.info("Tend finished. Live node count: #{nodes.length} #{nodes}")
+ if cluster_config_changed
+ update_cluster_features
+
+ # only log the tend finish IF the number of nodes has been changed.
+ # This prevents spamming the log on every tend interval
+ if @old_node_cound > nodes.length
+ Aerospike.logger.info("Tend finished. #{@old_node_cound - nodes.length} nodes have left the cluster. Old node count: #{@old_node_cound}, New node count #{nodes.length} #{nodes}")
+ else
+ Aerospike.logger.info("Tend finished. #{nodes.length - @old_node_cound} nodes have joined the cluster. Old node count: #{@old_node_cound}, New node count #{nodes.length} #{nodes}")
+ end
+ @old_node_cound = nodes.length
+
+ notify_cluster_config_changed
+ end
end
def wait_till_stablized
count = -1
@@ -261,18 +310,33 @@
end
end
# wait for the thread to finish or timeout
begin
- Timeout.timeout(1) do
+ Timeout.timeout(@connection_timeout) do
thr.join
end
rescue Timeout::Error
thr.kill if thr.alive?
end
@closed.value = false if @cluster_nodes.length > 0
+ end
+
+ def update_cluster_features
+ # Cluster supports features that are supported by all nodes
+ @features.update do |cluster_features|
+ node_featues = self.nodes.map(&:features)
+ cluster_features.replace(node_featues.reduce(&:intersection))
+ end
+ end
+
+ def notify_cluster_config_changed
+ listeners = @cluster_config_change_listeners.get
+ listeners.each do |listener|
+ listener.send(:cluster_config_changed, self)
+ end
end
def set_partitions(part_map)
@mutex.synchronize do
@partition_write_map = part_map