lib/aerospike/client.rb in aerospike-2.8.0 vs lib/aerospike/client.rb in aerospike-2.9.0

- old
+ new

@@ -35,22 +35,23 @@ # #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and # +:fail_if_not_connected+ set to true class Client - attr_accessor :default_policy, :default_write_policy, - :default_scan_policy, :default_query_policy, :default_admin_policy + attr_accessor :default_admin_policy + attr_accessor :default_batch_policy + attr_accessor :default_info_policy + attr_accessor :default_query_policy + attr_accessor :default_read_policy + attr_accessor :default_scan_policy + attr_accessor :default_write_policy def initialize(hosts = nil, policy: ClientPolicy.new, connect: true) - @default_policy = Policy.new - @default_write_policy = WritePolicy.new - @default_scan_policy = ScanPolicy.new - @default_query_policy = QueryPolicy.new - @default_admin_policy = QueryPolicy.new hosts = ::Aerospike::Host::Parse.(hosts || ENV['AEROSPIKE_HOSTS'] || 'localhost') policy = create_policy(policy, ClientPolicy) + set_default_policies(policy.policies) @cluster = Cluster.new(policy, hosts) @cluster.add_cluster_config_change_listener(self) self.connect if connect self @@ -110,11 +111,11 @@ # Examples: # # client.put key, {'bin', 'value string'}, :timeout => 0.001 def put(key, bins, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE) execute_command(command) end #------------------------------------------------------- @@ -133,11 +134,11 @@ # Examples: # # client.append key, {'bin', 'value to append'}, :timeout => 0.001 def append(key, bins, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND) execute_command(command) end ## @@ -152,11 +153,11 @@ # Examples: # # client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001 def prepend(key, bins, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND) execute_command(command) end #------------------------------------------------------- @@ -175,11 +176,11 @@ # Examples: # # client.add key, {'bin', -1}, :timeout => 0.001 def add(key, bins, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD) execute_command(command) end #------------------------------------------------------- @@ -197,11 +198,11 @@ # Examples: # # existed = client.delete key, :timeout => 0.001 def delete(key, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = DeleteCommand.new(@cluster, policy, key) execute_command(command) command.existed end @@ -219,15 +220,23 @@ # cut-off (set at the time of the truncate call.) # # If no policy options are provided, +@default_info_policy+ will be used. def truncate(namespace, set_name = nil, before_last_update = nil, options = {}) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, Policy, default_info_policy) + str_cmd = "truncate:namespace=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? - str_cmd << ";lut=#{(before_last_update.to_f * 1_000_000_000.0).round}" if before_last_update + if before_last_update + lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round + str_cmd << ";lut=#{lut_nanos}" + elsif supports_feature?(Aerospike::Features::LUT_NOW) + # Servers >= 4.3.1.4 require lut argument + str_cmd << ";lut=now" + end + # Send index command to one node. That node will distribute the command to other nodes. response = send_info_command(policy, str_cmd).upcase return if response == 'OK' raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}") end @@ -245,11 +254,11 @@ # Examples: # # client.touch key, :timeout => 0.001 def touch(key, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = TouchCommand.new(@cluster, policy, key) execute_command(command) end #------------------------------------------------------- @@ -258,11 +267,11 @@ ## # Determines if a record key exists. # The policy can be used to specify timeouts. def exists(key, options = nil) - policy = create_policy(options, Policy) + policy = create_policy(options, Policy, default_read_policy) command = ExistsCommand.new(@cluster, policy, key) execute_command(command) command.exists end @@ -271,21 +280,21 @@ #------------------------------------------------------- # Read record header and bins for specified key. # The policy can be used to specify timeouts. def get(key, bin_names = nil, options = nil) - policy = create_policy(options, Policy) + policy = create_policy(options, Policy, default_read_policy) command = ReadCommand.new(@cluster, policy, key, bin_names) execute_command(command) command.record end # Read record generation and expiration only for specified key. Bins are not read. # The policy can be used to specify timeouts. def get_header(key, options = nil) - policy = create_policy(options, Policy) + policy = create_policy(options, Policy, default_read_policy) command = ReadHeaderCommand.new(@cluster, policy, key) execute_command(command) command.record end @@ -296,11 +305,11 @@ # Read multiple record headers and bins for specified keys in one batch call. # The returned records are in positional order with the original key array order. # If a key is not found, the positional record will be nil. # The policy can be used to specify timeouts and protocol type. def batch_get(keys, bin_names = nil, options = nil) - policy = create_policy(options, BatchPolicy) + policy = create_policy(options, BatchPolicy, default_batch_policy) results = Array.new(keys.length) info_flags = INFO1_READ case bin_names when :all, nil, [] @@ -335,11 +344,11 @@ # Check if multiple record keys exist in one batch call. # The returned boolean array is in positional order with the original key array order. # The policy can be used to specify timeouts and protocol type. def batch_exists(keys, options = nil) - policy = create_policy(options, BatchPolicy) + policy = create_policy(options, BatchPolicy, default_batch_policy) results = Array.new(keys.length) if policy.use_batch_direct key_map = BatchItem.generate_map(keys) execute_batch_direct_commands(keys) do |node, batch| @@ -361,11 +370,11 @@ # Perform multiple read/write operations on a single key in one batch call. # An example would be to add an integer value to an existing record and then # read the result, all in one database call. Operations are executed in # the order they are specified. def operate(key, operations, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = OperateCommand.new(@cluster, policy, key, operations) execute_command(command) command.record end @@ -390,16 +399,18 @@ # The user can optionally wait for command completion by using the returned # RegisterTask instance. # # This method is only supported by Aerospike 3 servers. def register_udf(udf_body, server_path, language, options = nil) - content = Base64.strict_encode64(udf_body).force_encoding('binary') + policy = create_policy(options, Policy, default_info_policy) + content = Base64.strict_encode64(udf_body).force_encoding('binary') str_cmd = "udf-put:filename=#{server_path};content=#{content};" str_cmd << "content-len=#{content.length};udf-type=#{language};" + # Send UDF to one node. That node will distribute the UDF to other nodes. - response_map = @cluster.request_info(@default_policy, str_cmd) + response_map = @cluster.request_info(policy, str_cmd) res = {} response_map.each do |k, response| vals = response.to_s.split(';') vals.each do |pair| @@ -420,15 +431,17 @@ # The user can optionally wait for command completion by using the returned # RemoveTask instance. # # This method is only supported by Aerospike 3 servers. def remove_udf(udf_name, options = nil) + policy = create_policy(options, Policy, default_info_policy) + str_cmd = "udf-remove:filename=#{udf_name};" # Send command to one node. That node will distribute it to other nodes. # Send UDF to one node. That node will distribute the UDF to other nodes. - response_map = @cluster.request_info(@default_policy, str_cmd) + response_map = @cluster.request_info(policy, str_cmd) _, response = response_map.first if response == 'ok' UdfRemoveTask.new(@cluster, udf_name) else @@ -437,14 +450,16 @@ end # ListUDF lists all packages containing user defined functions in the server. # This method is only supported by Aerospike 3 servers. def list_udf(options = nil) + policy = create_policy(options, Policy, default_info_policy) + str_cmd = 'udf-list' # Send command to one node. That node will distribute it to other nodes. - response_map = @cluster.request_info(@default_policy, str_cmd) + response_map = @cluster.request_info(policy, str_cmd) _, response = response_map.first vals = response.split(';') vals.map do |udf_info| @@ -473,11 +488,11 @@ # # udf file = <server udf dir>/<package name>.lua # # This method is only supported by Aerospike 3 servers. def execute_udf(key, package_name, function_name, args=[], options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, WritePolicy, default_write_policy) command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args) execute_command(command) record = command.record @@ -502,11 +517,11 @@ # ExecuteTask instance. # # This method is only supported by Aerospike 3 servers. # If the policy is nil, the default relevant policy will be used. def execute_udf_on_query(statement, package_name, function_name, function_args=[], options = nil) - policy = create_policy(options, QueryPolicy) + policy = create_policy(options, QueryPolicy, default_query_policy) nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.") end @@ -538,15 +553,16 @@ # IndexTask instance. # # This method is only supported by Aerospike 3 servers. # index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later) # collection_type should be :list, :mapkeys or :mapvalues - def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type=nil, options = nil) + def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil) if options.nil? && collection_type.is_a?(Hash) options, collection_type = collection_type, nil end - policy = create_policy(options, WritePolicy) + policy = create_policy(options, Policy, default_info_policy) + str_cmd = "sindex-create:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name};numbins=1" str_cmd << ";indextype=#{collection_type.to_s.upcase}" if collection_type str_cmd << ";indexdata=#{bin_name},#{index_type.to_s.upcase}" @@ -568,11 +584,12 @@ end # Delete secondary index. # This method is only supported by Aerospike 3 servers. def drop_index(namespace, set_name, index_name, options = nil) - policy = create_policy(options, WritePolicy) + policy = create_policy(options, Policy, default_info_policy) + str_cmd = "sindex-delete:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name}" # Send index command to one node. That node will distribute the command to other nodes. @@ -583,20 +600,21 @@ return if response.start_with?('FAIL:201') raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Drop index failed: #{response}") end - def request_info(*commands) - @cluster.request_info(@default_policy, *commands) + def request_info(*commands, policy: nil) + policy = create_policy(policy, Policy, default_info_policy) + @cluster.request_info(policy, *commands) end #------------------------------------------------------- # Scan Operations #------------------------------------------------------- def scan_all(namespace, set_name, bin_names = nil, options = nil) - policy = create_policy(options, ScanPolicy) + policy = create_policy(options, ScanPolicy, default_scan_policy) # wait until all migrations are finished # TODO: implement # @cluster.WaitUntillMigrationIsFinished(policy.timeout) @@ -648,11 +666,11 @@ end # ScanNode reads all records in specified namespace and set, from one node only. # The policy can be used to specify timeouts. def scan_node(node, namespace, set_name, bin_names = nil, options = nil) - policy = create_policy(options, ScanPolicy) + policy = create_policy(options, ScanPolicy, default_scan_policy) # wait until all migrations are finished # TODO: implement # @cluster.WaitUntillMigrationIsFinished(policy.timeout) # Retry policy must be one-shot for scans. @@ -690,11 +708,11 @@ # record channel. # # This method is only supported by Aerospike 3 servers. # If the policy is nil, a default policy will be generated. def query(statement, options = nil) - policy = create_policy(options, QueryPolicy) + policy = create_policy(options, QueryPolicy, default_query_policy) new_policy = policy.clone nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") @@ -726,27 +744,27 @@ #------------------------------------------------------- # Create user with password and roles. Clear-text password will be hashed using bcrypt # before sending to server. def create_user(user, password, roles, options = nil) - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) hash = AdminCommand.hash_password(password) command = AdminCommand.new command.create_user(@cluster, policy, user, hash, roles) end # Remove user from cluster. def drop_user(user, options = nil) - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) command = AdminCommand.new command.drop_user(@cluster, policy, user) end # Change user's password. Clear-text password will be hashed using bcrypt before sending to server. def change_password(user, password, options = nil) raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != "" - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) hash = AdminCommand.hash_password(password) command = AdminCommand.new if user == @cluster.user @@ -760,40 +778,49 @@ @cluster.change_password(user, hash) end # Add roles to user's list of roles. def grant_roles(user, roles, options = nil) - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) command = AdminCommand.new command.grant_roles(@cluster, policy, user, roles) end # Remove roles from user's list of roles. def revoke_roles(user, roles, options = nil) - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) command = AdminCommand.new command.revoke_roles(@cluster, policy, user, roles) end # Retrieve roles for a given user. def query_user(user, options = nil) - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) command = AdminCommand.new command.query_user(@cluster, policy, user) end # Retrieve all users and their roles. def query_users(options = nil) - policy = create_policy(options, AdminPolicy) + policy = create_policy(options, AdminPolicy, default_admin_policy) command = AdminCommand.new command.query_users(@cluster, policy) end private + def set_default_policies(policies) + self.default_info_policy = create_policy(policies[:info], Policy) + self.default_read_policy = create_policy(policies[:read], Policy) + self.default_admin_policy = create_policy(policies[:admin], AdminPolicy) + self.default_batch_policy = create_policy(policies[:batch], BatchPolicy) + self.default_query_policy = create_policy(policies[:query], QueryPolicy) + self.default_scan_policy = create_policy(policies[:scan], ScanPolicy) + self.default_write_policy = create_policy(policies[:write], WritePolicy) + end + def send_info_command(policy, command) - policy ||= default_policy Aerospike.logger.debug { "Sending info command: #{command}" } _, response = @cluster.request_info(policy, command).first response.to_s end @@ -808,14 +835,14 @@ Bin.new(k, v) end end end - def create_policy(policy, policy_klass) + def create_policy(policy, policy_klass, default_policy = nil) case policy when nil - policy_klass.new + default_policy || policy_klass.new when policy_klass policy when Hash policy_klass.new(policy) else @@ -836,11 +863,11 @@ Aerospike.logger.debug { "Cluster features: #{@cluster.features.get.to_a}" } validators = [] # guard against unsupported particle types unsupported_particle_types = [] - unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless @cluster.supports_feature?("float") - unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless @cluster.supports_feature?("geo") + unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless supports_feature?(Aerospike::Features::FLOAT) + unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless supports_feature?(Aerospike::Features::GEO) validators << UnsupportedParticleTypeValidator.new(*unsupported_particle_types) unless unsupported_particle_types.empty? @command_validators = validators end