lib/aerospike/client.rb in aerospike-1.0.10 vs lib/aerospike/client.rb in aerospike-1.0.11

- old
+ new

@@ -47,10 +47,12 @@ @default_admin_policy = QueryPolicy.new policy = opt_to_client_policy(options) @cluster = Cluster.new(policy, Host.new(host, port)) + @cluster.add_cluster_config_change_listener(self) + @cluster.connect self end def self.new_many(hosts, options={}) @@ -63,11 +65,14 @@ client.default_query_policy = QueryPolicy.new client.default_admin_policy = QueryPolicy.new policy = client.send(:opt_to_client_policy , options) - client.send(:cluster=, Cluster.new(policy, *hosts)) + cluster = Cluster.new(policy, *hosts) + cluster.add_cluster_config_change_listener(client) + client.send(:cluster=, cluster) + cluster.connect client end ## @@ -117,11 +122,11 @@ # client.put key, {'bin', 'value string'}, :timeout => 0.001 def put(key, bins, options={}) policy = opt_to_write_policy(options) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE) - command.execute + execute_command(command) end #------------------------------------------------------- # Operations string #------------------------------------------------------- @@ -140,11 +145,11 @@ # client.append key, {'bin', 'value to append'}, :timeout => 0.001 def append(key, bins, options={}) policy = opt_to_write_policy(options) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND) - command.execute + execute_command(command) end ## # Prepends bin values string to existing record bin values. # The policy specifies the transaction timeout, record expiration and @@ -159,11 +164,11 @@ # client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001 def prepend(key, bins, options={}) policy = opt_to_write_policy(options) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND) - command.execute + execute_command(command) end #------------------------------------------------------- # Arithmetic Operations #------------------------------------------------------- @@ -182,11 +187,11 @@ # client.add key, {'bin', -1}, :timeout => 0.001 def add(key, bins, options={}) policy = opt_to_write_policy(options) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD) - command.execute + execute_command(command) end #------------------------------------------------------- # Delete Operations #------------------------------------------------------- @@ -204,11 +209,11 @@ # existed = client.delete key, :timeout => 0.001 def delete(key, options={}) policy = opt_to_write_policy(options) command = DeleteCommand.new(@cluster, policy, key) - command.execute + execute_command(command) command.existed end #------------------------------------------------------- # Touch Operations @@ -225,11 +230,11 @@ # client.touch key, :timeout => 0.001 def touch(key, options={}) policy = opt_to_write_policy(options) command = TouchCommand.new(@cluster, policy, key) - command.execute + execute_command(command) end #------------------------------------------------------- # Existence-Check Operations #------------------------------------------------------- @@ -238,11 +243,11 @@ # Determines if a record key exists. # The policy can be used to specify timeouts. def exists(key, options={}) policy = opt_to_policy(options) command = ExistsCommand.new(@cluster, policy, key) - command.execute + execute_command(command) command.exists end # Check if multiple record keys exist in one batch call. # The returned array bool is in positional order with the original key array order. @@ -272,20 +277,20 @@ # The policy can be used to specify timeouts. def get(key, bin_names=[], options={}) policy = opt_to_policy(options) command = ReadCommand.new(@cluster, policy, key, bin_names) - command.execute + execute_command(command) command.record end # Read record generation and expiration only for specified key. Bins are not read. # The policy can be used to specify timeouts. def get_header(key, options={}) policy = opt_to_policy(options) command = ReadHeaderCommand.new(@cluster, policy, key) - command.execute + execute_command(command) command.record end #------------------------------------------------------- # Batch Read Operations @@ -353,11 +358,11 @@ # relative to read operations. def operate(key, operations, options={}) policy = opt_to_write_policy(options) command = OperateCommand.new(@cluster, policy, key, operations) - command.execute + execute_command(command) command.record end #------------------------------------------------------------------- # Large collection functions (Supported by Aerospike 3 servers only) @@ -504,11 +509,11 @@ # This method is only supported by Aerospike 3 servers. def execute_udf(key, package_name, function_name, args=[], options={}) policy = opt_to_write_policy(options) command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args) - command.execute + execute_command(command) record = command.record return nil if !record || record.bins.length == 0 @@ -551,11 +556,11 @@ nodes.each do |node| Thread.new do abort_on_exception = true begin command = QueryCommand.new(node, policy, statement, nil) - command.execute + execute_command(command) rescue => e Aerospike.logger.error(e) raise e end end @@ -569,11 +574,11 @@ # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned # IndexTask instance. # # This method is only supported by Aerospike 3 servers. - # index_type should be between :string or :numeric + # index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later) def create_index(namespace, set_name, index_name, bin_name, index_type, options={}) policy = opt_to_write_policy(options) str_cmd = "sindex-create:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name};numbins=1;indexdata=#{bin_name},#{index_type.to_s.upcase}" @@ -649,11 +654,11 @@ nodes.each do |node| Thread.new do abort_on_exception = true command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) begin - command.execute + execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished @@ -664,11 +669,11 @@ Thread.new do abort_on_exception = true nodes.each do |node| command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) begin - command.execute + execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished @@ -699,11 +704,11 @@ Thread.new do abort_on_exception = true command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) begin - command.execute + execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished @@ -739,11 +744,11 @@ nodes.each do |node| Thread.new do abort_on_exception = true command = QueryCommand.new(node, new_policy, statement, recordset) begin - command.execute + execute_command(command) rescue => e Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished @@ -904,10 +909,40 @@ def cluster=(cluster) @cluster = cluster end + def cluster_config_changed(cluster) + Aerospike.logger.debug { "Cluster config change detected; active nodes: #{cluster.nodes.map(&:name)}" } + setup_command_validators + end + + def setup_command_validators + Aerospike.logger.debug { "Cluster features: #{@cluster.features.get.to_a}" } + validators = [] + + # guard against unsupported particle types + unsupported_particle_types = [] + unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless @cluster.supports_feature?("float") + unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless @cluster.supports_feature?("geo") + validators << UnsupportedParticleTypeValidator.new(*unsupported_particle_types) unless unsupported_particle_types.empty? + + @command_validators = validators + end + + def validate_command(command) + return unless @command_validators + @command_validators.each do |validator| + validator.call(command) + end + end + + def execute_command(command) + validate_command(command) + command.execute + end + def batch_execute(keys, &cmd_gen) batch_nodes = BatchNode.generate_list(@cluster, keys) threads = [] # Use a thread per namespace per node @@ -916,10 +951,10 @@ bn = batch_node bn.batch_namespaces.each do |bns| threads << Thread.new do abort_on_exception=true command = cmd_gen.call(bn.node, bns) - command.execute + execute_command(command) end end end threads.each { |thr| thr.join }