lib/aerospike/client.rb in aerospike-1.0.10 vs lib/aerospike/client.rb in aerospike-1.0.11
- old
+ new
@@ -47,10 +47,12 @@
@default_admin_policy = QueryPolicy.new
policy = opt_to_client_policy(options)
@cluster = Cluster.new(policy, Host.new(host, port))
+ @cluster.add_cluster_config_change_listener(self)
+ @cluster.connect
self
end
def self.new_many(hosts, options={})
@@ -63,11 +65,14 @@
client.default_query_policy = QueryPolicy.new
client.default_admin_policy = QueryPolicy.new
policy = client.send(:opt_to_client_policy , options)
- client.send(:cluster=, Cluster.new(policy, *hosts))
+ cluster = Cluster.new(policy, *hosts)
+ cluster.add_cluster_config_change_listener(client)
+ client.send(:cluster=, cluster)
+ cluster.connect
client
end
##
@@ -117,11 +122,11 @@
# client.put key, {'bin', 'value string'}, :timeout => 0.001
def put(key, bins, options={})
policy = opt_to_write_policy(options)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE)
- command.execute
+ execute_command(command)
end
#-------------------------------------------------------
# Operations string
#-------------------------------------------------------
@@ -140,11 +145,11 @@
# client.append key, {'bin', 'value to append'}, :timeout => 0.001
def append(key, bins, options={})
policy = opt_to_write_policy(options)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND)
- command.execute
+ execute_command(command)
end
##
# Prepends bin values string to existing record bin values.
# The policy specifies the transaction timeout, record expiration and
@@ -159,11 +164,11 @@
# client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001
def prepend(key, bins, options={})
policy = opt_to_write_policy(options)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND)
- command.execute
+ execute_command(command)
end
#-------------------------------------------------------
# Arithmetic Operations
#-------------------------------------------------------
@@ -182,11 +187,11 @@
# client.add key, {'bin', -1}, :timeout => 0.001
def add(key, bins, options={})
policy = opt_to_write_policy(options)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD)
- command.execute
+ execute_command(command)
end
#-------------------------------------------------------
# Delete Operations
#-------------------------------------------------------
@@ -204,11 +209,11 @@
# existed = client.delete key, :timeout => 0.001
def delete(key, options={})
policy = opt_to_write_policy(options)
command = DeleteCommand.new(@cluster, policy, key)
- command.execute
+ execute_command(command)
command.existed
end
#-------------------------------------------------------
# Touch Operations
@@ -225,11 +230,11 @@
# client.touch key, :timeout => 0.001
def touch(key, options={})
policy = opt_to_write_policy(options)
command = TouchCommand.new(@cluster, policy, key)
- command.execute
+ execute_command(command)
end
#-------------------------------------------------------
# Existence-Check Operations
#-------------------------------------------------------
@@ -238,11 +243,11 @@
# Determines if a record key exists.
# The policy can be used to specify timeouts.
def exists(key, options={})
policy = opt_to_policy(options)
command = ExistsCommand.new(@cluster, policy, key)
- command.execute
+ execute_command(command)
command.exists
end
# Check if multiple record keys exist in one batch call.
# The returned array bool is in positional order with the original key array order.
@@ -272,20 +277,20 @@
# The policy can be used to specify timeouts.
def get(key, bin_names=[], options={})
policy = opt_to_policy(options)
command = ReadCommand.new(@cluster, policy, key, bin_names)
- command.execute
+ execute_command(command)
command.record
end
# Read record generation and expiration only for specified key. Bins are not read.
# The policy can be used to specify timeouts.
def get_header(key, options={})
policy = opt_to_policy(options)
command = ReadHeaderCommand.new(@cluster, policy, key)
- command.execute
+ execute_command(command)
command.record
end
#-------------------------------------------------------
# Batch Read Operations
@@ -353,11 +358,11 @@
# relative to read operations.
def operate(key, operations, options={})
policy = opt_to_write_policy(options)
command = OperateCommand.new(@cluster, policy, key, operations)
- command.execute
+ execute_command(command)
command.record
end
#-------------------------------------------------------------------
# Large collection functions (Supported by Aerospike 3 servers only)
@@ -504,11 +509,11 @@
# This method is only supported by Aerospike 3 servers.
def execute_udf(key, package_name, function_name, args=[], options={})
policy = opt_to_write_policy(options)
command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args)
- command.execute
+ execute_command(command)
record = command.record
return nil if !record || record.bins.length == 0
@@ -551,11 +556,11 @@
nodes.each do |node|
Thread.new do
abort_on_exception = true
begin
command = QueryCommand.new(node, policy, statement, nil)
- command.execute
+ execute_command(command)
rescue => e
Aerospike.logger.error(e)
raise e
end
end
@@ -569,11 +574,11 @@
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
# IndexTask instance.
#
# This method is only supported by Aerospike 3 servers.
- # index_type should be between :string or :numeric
+ # index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later)
def create_index(namespace, set_name, index_name, bin_name, index_type, options={})
policy = opt_to_write_policy(options)
str_cmd = "sindex-create:ns=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
str_cmd << ";indexname=#{index_name};numbins=1;indexdata=#{bin_name},#{index_type.to_s.upcase}"
@@ -649,11 +654,11 @@
nodes.each do |node|
Thread.new do
abort_on_exception = true
command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
begin
- command.execute
+ execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
@@ -664,11 +669,11 @@
Thread.new do
abort_on_exception = true
nodes.each do |node|
command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
begin
- command.execute
+ execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
@@ -699,11 +704,11 @@
Thread.new do
abort_on_exception = true
command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
begin
- command.execute
+ execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
@@ -739,11 +744,11 @@
nodes.each do |node|
Thread.new do
abort_on_exception = true
command = QueryCommand.new(node, new_policy, statement, recordset)
begin
- command.execute
+ execute_command(command)
rescue => e
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
@@ -904,10 +909,40 @@
def cluster=(cluster)
@cluster = cluster
end
+ def cluster_config_changed(cluster)
+ Aerospike.logger.debug { "Cluster config change detected; active nodes: #{cluster.nodes.map(&:name)}" }
+ setup_command_validators
+ end
+
+ def setup_command_validators
+ Aerospike.logger.debug { "Cluster features: #{@cluster.features.get.to_a}" }
+ validators = []
+
+ # guard against unsupported particle types
+ unsupported_particle_types = []
+ unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless @cluster.supports_feature?("float")
+ unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless @cluster.supports_feature?("geo")
+ validators << UnsupportedParticleTypeValidator.new(*unsupported_particle_types) unless unsupported_particle_types.empty?
+
+ @command_validators = validators
+ end
+
+ def validate_command(command)
+ return unless @command_validators
+ @command_validators.each do |validator|
+ validator.call(command)
+ end
+ end
+
+ def execute_command(command)
+ validate_command(command)
+ command.execute
+ end
+
def batch_execute(keys, &cmd_gen)
batch_nodes = BatchNode.generate_list(@cluster, keys)
threads = []
# Use a thread per namespace per node
@@ -916,10 +951,10 @@
bn = batch_node
bn.batch_namespaces.each do |bns|
threads << Thread.new do
abort_on_exception=true
command = cmd_gen.call(bn.node, bns)
- command.execute
+ execute_command(command)
end
end
end
threads.each { |thr| thr.join }