lib/aerospike/client.rb in aerospike-2.6.0 vs lib/aerospike/client.rb in aerospike-2.7.0

- old
+ new

@@ -1,6 +1,5 @@ -# encoding: utf-8 # Copyright 2014-2018 Aerospike, Inc. # # Portions may be licensed to Aerospike, Inc. under one or more contributor # license agreements. # @@ -110,11 +109,11 @@ # Examples: # # client.put key, {'bin', 'value string'}, :timeout => 0.001 - def put(key, bins, options={}) + def put(key, bins, options = nil) policy = create_policy(options, WritePolicy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE) execute_command(command) end @@ -133,11 +132,11 @@ # Examples: # # client.append key, {'bin', 'value to append'}, :timeout => 0.001 - def append(key, bins, options={}) + def append(key, bins, options = nil) policy = create_policy(options, WritePolicy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND) execute_command(command) end @@ -152,11 +151,11 @@ # Examples: # # client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001 - def prepend(key, bins, options={}) + def prepend(key, bins, options = nil) policy = create_policy(options, WritePolicy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND) execute_command(command) end @@ -175,11 +174,11 @@ # Examples: # # client.add key, {'bin', -1}, :timeout => 0.001 - def add(key, bins, options={}) + def add(key, bins, options = nil) policy = create_policy(options, WritePolicy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD) execute_command(command) end @@ -197,11 +196,11 @@ # Examples: # # existed = client.delete key, :timeout => 0.001 - def delete(key, options={}) + def delete(key, options = nil) policy = create_policy(options, WritePolicy) command = DeleteCommand.new(@cluster, policy, key) execute_command(command) command.existed end @@ -245,11 +244,11 @@ # Examples: # # client.touch key, :timeout => 0.001 - def touch(key, options={}) + def touch(key, options = nil) policy = create_policy(options, WritePolicy) command = TouchCommand.new(@cluster, policy, key) execute_command(command) end @@ -258,52 +257,34 @@ #------------------------------------------------------- ## # Determines if a record key exists. # The policy can be used to specify timeouts. - def exists(key, options={}) + def exists(key, options = nil) policy = create_policy(options, Policy) command = ExistsCommand.new(@cluster, policy, key) execute_command(command) command.exists end - # Check if multiple record keys exist in one batch call. - # The returned array bool is in positional order with the original key array order. - # The policy can be used to specify timeouts. - def batch_exists(keys, options={}) - policy = create_policy(options, Policy) - - # same array can be used without sychronization; - # when a key exists, the corresponding index will be marked true - exists_array = Array.new(keys.length) - - key_map = BatchItem.generate_map(keys) - - batch_execute(keys) do |node, bns| - BatchCommandExists.new(node, bns, policy, key_map, exists_array) - end - exists_array - end - #------------------------------------------------------- # Read Record Operations #------------------------------------------------------- # Read record header and bins for specified key. # The policy can be used to specify timeouts. - def get(key, bin_names=[], options={}) + def get(key, bin_names = nil, options = nil) policy = create_policy(options, Policy) command = ReadCommand.new(@cluster, policy, key, bin_names) execute_command(command) command.record end # Read record generation and expiration only for specified key. Bins are not read. # The policy can be used to specify timeouts. - def get_header(key, options={}) + def get_header(key, options = nil) policy = create_policy(options, Policy) command = ReadHeaderCommand.new(@cluster, policy, key) execute_command(command) command.record end @@ -313,63 +294,77 @@ #------------------------------------------------------- # Read multiple record headers and bins for specified keys in one batch call. # The returned records are in positional order with the original key array order. # If a key is not found, the positional record will be nil. - # The policy can be used to specify timeouts. - def batch_get(keys, bin_names=[], options={}) - policy = create_policy(options, Policy) + # The policy can be used to specify timeouts and protocol type. + def batch_get(keys, bin_names = nil, options = nil) + policy = create_policy(options, BatchPolicy) + results = Array.new(keys.length) + info_flags = INFO1_READ - # wait until all migrations are finished - # TODO: implement - # @cluster.WaitUntillMigrationIsFinished(policy.timeout) + case bin_names + when :all, nil, [] + info_flags |= INFO1_GET_ALL + bin_names = nil + when :none + info_flags |= INFO1_NOBINDATA + bin_names = nil + end - # same array can be used without sychronization; - # when a key exists, the corresponding index will be set to record - records = Array.new(keys.length) - - key_map = BatchItem.generate_map(keys) - - batch_execute(keys) do |node, bns| - BatchCommandGet.new(node, bns, policy, key_map, bin_names.uniq, records, INFO1_READ) + if policy.use_batch_direct + key_map = BatchItem.generate_map(keys) + execute_batch_direct_commands(keys) do |node, batch| + BatchDirectCommand.new(node, batch, policy, key_map, bin_names, results, info_flags) + end + else + execute_batch_index_commands(keys) do |node, batch| + BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags) + end end - records + + results end # Read multiple record header data for specified keys in one batch call. # The returned records are in positional order with the original key array order. # If a key is not found, the positional record will be nil. - # The policy can be used to specify timeouts. - def batch_get_header(keys, options={}) - policy = create_policy(options, Policy) + # The policy can be used to specify timeouts and protocol type. + def batch_get_header(keys, options = nil) + batch_get(keys, :none, options) + end - # wait until all migrations are finished - # TODO: Fix this and implement - # @cluster.WaitUntillMigrationIsFinished(policy.timeout) + # Check if multiple record keys exist in one batch call. + # The returned boolean array is in positional order with the original key array order. + # The policy can be used to specify timeouts and protocol type. + def batch_exists(keys, options = nil) + policy = create_policy(options, BatchPolicy) + results = Array.new(keys.length) - # same array can be used without sychronization; - # when a key exists, the corresponding index will be set to record - records = Array.new(keys.length) - - key_map = BatchItem.generate_map(keys) - - batch_execute(keys) do |node, bns| - BatchCommandGet.new(node, bns, policy, key_map, nil, records, INFO1_READ | INFO1_NOBINDATA) + if policy.use_batch_direct + key_map = BatchItem.generate_map(keys) + execute_batch_direct_commands(keys) do |node, batch| + BatchDirectExistsCommand.new(node, batch, policy, key_map, results) + end + else + execute_batch_index_commands(keys) do |node, batch| + BatchIndexExistsCommand.new(node, batch, policy, results) + end end - records + results end #------------------------------------------------------- # Generic Database Operations #------------------------------------------------------- # Perform multiple read/write operations on a single key in one batch call. # An example would be to add an integer value to an existing record and then # read the result, all in one database call. Operations are executed in # the order they are specified. - def operate(key, operations, options={}) + def operate(key, operations, options = nil) policy = create_policy(options, WritePolicy) command = OperateCommand.new(@cluster, policy, key, operations) execute_command(command) command.record @@ -383,22 +378,22 @@ # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned # RegisterTask instance. # # This method is only supported by Aerospike 3 servers. - def register_udf_from_file(client_path, server_path, language, options={}) + def register_udf_from_file(client_path, server_path, language, options = nil) udf_body = File.read(client_path) register_udf(udf_body, server_path, language, options) end # Register package containing user defined functions with server. # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned # RegisterTask instance. # # This method is only supported by Aerospike 3 servers. - def register_udf(udf_body, server_path, language, options={}) + def register_udf(udf_body, server_path, language, options = nil) content = Base64.strict_encode64(udf_body).force_encoding('binary') str_cmd = "udf-put:filename=#{server_path};content=#{content};" str_cmd << "content-len=#{content.length};udf-type=#{language};" # Send UDF to one node. That node will distribute the UDF to other nodes. @@ -424,11 +419,11 @@ # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned # RemoveTask instance. # # This method is only supported by Aerospike 3 servers. - def remove_udf(udf_name, options={}) + def remove_udf(udf_name, options = nil) str_cmd = "udf-remove:filename=#{udf_name};" # Send command to one node. That node will distribute it to other nodes. # Send UDF to one node. That node will distribute the UDF to other nodes. response_map = @cluster.request_info(@default_policy, str_cmd) @@ -441,11 +436,11 @@ end end # ListUDF lists all packages containing user defined functions in the server. # This method is only supported by Aerospike 3 servers. - def list_udf(options={}) + def list_udf(options = nil) str_cmd = 'udf-list' # Send command to one node. That node will distribute it to other nodes. response_map = @cluster.request_info(@default_policy, str_cmd) _, response = response_map.first @@ -477,11 +472,11 @@ # The package name is used to locate the udf file location: # # udf file = <server udf dir>/<package name>.lua # # This method is only supported by Aerospike 3 servers. - def execute_udf(key, package_name, function_name, args=[], options={}) + def execute_udf(key, package_name, function_name, args=[], options = nil) policy = create_policy(options, WritePolicy) command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args) execute_command(command) @@ -506,11 +501,11 @@ # The user can optionally wait for command completion by using the returned # ExecuteTask instance. # # This method is only supported by Aerospike 3 servers. # If the policy is nil, the default relevant policy will be used. - def execute_udf_on_query(statement, package_name, function_name, function_args=[], options={}) + def execute_udf_on_query(statement, package_name, function_name, function_args=[], options = nil) policy = create_policy(options, QueryPolicy) nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.") @@ -543,11 +538,11 @@ # IndexTask instance. # # This method is only supported by Aerospike 3 servers. # index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later) # collection_type should be :list, :mapkeys or :mapvalues - def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type=nil, options={}) + def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type=nil, options = nil) if options.nil? && collection_type.is_a?(Hash) options, collection_type = collection_type, nil end policy = create_policy(options, WritePolicy) str_cmd = "sindex-create:ns=#{namespace}" @@ -572,11 +567,11 @@ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Create index failed: #{response}") end # Delete secondary index. # This method is only supported by Aerospike 3 servers. - def drop_index(namespace, set_name, index_name, options={}) + def drop_index(namespace, set_name, index_name, options = nil) policy = create_policy(options, WritePolicy) str_cmd = "sindex-delete:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name}" @@ -596,11 +591,11 @@ #------------------------------------------------------- # Scan Operations #------------------------------------------------------- - def scan_all(namespace, set_name, bin_names=[], options={}) + def scan_all(namespace, set_name, bin_names = nil, options = nil) policy = create_policy(options, ScanPolicy) # wait until all migrations are finished # TODO: implement # @cluster.WaitUntillMigrationIsFinished(policy.timeout) @@ -652,11 +647,11 @@ recordset end # ScanNode reads all records in specified namespace and set, from one node only. # The policy can be used to specify timeouts. - def scan_node(node, namespace, set_name, bin_names=[], options={}) + def scan_node(node, namespace, set_name, bin_names = nil, options = nil) policy = create_policy(options, ScanPolicy) # wait until all migrations are finished # TODO: implement # @cluster.WaitUntillMigrationIsFinished(policy.timeout) @@ -694,11 +689,11 @@ # The caller can concurrently pops records off the channel through the # record channel. # # This method is only supported by Aerospike 3 servers. # If the policy is nil, a default policy will be generated. - def query(statement, options={}) + def query(statement, options = nil) policy = create_policy(options, QueryPolicy) new_policy = policy.clone nodes = @cluster.nodes if nodes.empty? @@ -730,26 +725,26 @@ # User administration #------------------------------------------------------- # Create user with password and roles. Clear-text password will be hashed using bcrypt # before sending to server. - def create_user(user, password, roles, options={}) + def create_user(user, password, roles, options = nil) policy = create_policy(options, AdminPolicy) hash = AdminCommand.hash_password(password) command = AdminCommand.new command.create_user(@cluster, policy, user, hash, roles) end # Remove user from cluster. - def drop_user(user, options={}) + def drop_user(user, options = nil) policy = create_policy(options, AdminPolicy) command = AdminCommand.new command.drop_user(@cluster, policy, user) end # Change user's password. Clear-text password will be hashed using bcrypt before sending to server. - def change_password(user, password, options={}) + def change_password(user, password, options = nil) raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != "" policy = create_policy(options, AdminPolicy) hash = AdminCommand.hash_password(password) command = AdminCommand.new @@ -764,32 +759,32 @@ @cluster.change_password(user, hash) end # Add roles to user's list of roles. - def grant_roles(user, roles, options={}) + def grant_roles(user, roles, options = nil) policy = create_policy(options, AdminPolicy) command = AdminCommand.new command.grant_roles(@cluster, policy, user, roles) end # Remove roles from user's list of roles. - def revoke_roles(user, roles, options={}) + def revoke_roles(user, roles, options = nil) policy = create_policy(options, AdminPolicy) command = AdminCommand.new command.revoke_roles(@cluster, policy, user, roles) end # Retrieve roles for a given user. - def query_user(user, options={}) + def query_user(user, options = nil) policy = create_policy(options, AdminPolicy) command = AdminCommand.new command.query_user(@cluster, policy, user) end # Retrieve all users and their roles. - def query_users(options={}) + def query_users(options = nil) policy = create_policy(options, AdminPolicy) command = AdminCommand.new command.query_users(@cluster, policy) end @@ -860,21 +855,44 @@ def execute_command(command) validate_command(command) command.execute end - def batch_execute(keys) - batch_nodes = BatchNode.generate_list(@cluster, keys) + def execute_batch_index_commands(keys) + if @cluster.nodes.empty? + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.") + end + + batch_nodes = BatchIndexNode.generate_list(@cluster, keys) threads = [] + batch_nodes.each do |batch| + threads << Thread.new do + Thread.current.abort_on_exception = true + command = yield batch.node, batch + execute_command(command) + end + end + + threads.each(&:join) + end + + def execute_batch_direct_commands(keys) + if @cluster.nodes.empty? + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Direct command failed because cluster is empty.") + end + + batch_nodes = BatchDirectNode.generate_list(@cluster, keys) + threads = [] + # Use a thread per namespace per node batch_nodes.each do |batch_node| # copy to avoid race condition bn = batch_node - bn.batch_namespaces.each do |bns| + bn.batch_namespaces.each do |batch| threads << Thread.new do Thread.current.abort_on_exception = true - command = yield bn.node, bns + command = yield batch_node.node, batch execute_command(command) end end end