lib/aerospike/client.rb in aerospike-2.6.0 vs lib/aerospike/client.rb in aerospike-2.7.0
- old
+ new
@@ -1,6 +1,5 @@
-# encoding: utf-8
# Copyright 2014-2018 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
#
@@ -110,11 +109,11 @@
# Examples:
#
# client.put key, {'bin', 'value string'}, :timeout => 0.001
- def put(key, bins, options={})
+ def put(key, bins, options = nil)
policy = create_policy(options, WritePolicy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE)
execute_command(command)
end
@@ -133,11 +132,11 @@
# Examples:
#
# client.append key, {'bin', 'value to append'}, :timeout => 0.001
- def append(key, bins, options={})
+ def append(key, bins, options = nil)
policy = create_policy(options, WritePolicy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND)
execute_command(command)
end
@@ -152,11 +151,11 @@
# Examples:
#
# client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001
- def prepend(key, bins, options={})
+ def prepend(key, bins, options = nil)
policy = create_policy(options, WritePolicy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND)
execute_command(command)
end
@@ -175,11 +174,11 @@
# Examples:
#
# client.add key, {'bin', -1}, :timeout => 0.001
- def add(key, bins, options={})
+ def add(key, bins, options = nil)
policy = create_policy(options, WritePolicy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD)
execute_command(command)
end
@@ -197,11 +196,11 @@
# Examples:
#
# existed = client.delete key, :timeout => 0.001
- def delete(key, options={})
+ def delete(key, options = nil)
policy = create_policy(options, WritePolicy)
command = DeleteCommand.new(@cluster, policy, key)
execute_command(command)
command.existed
end
@@ -245,11 +244,11 @@
# Examples:
#
# client.touch key, :timeout => 0.001
- def touch(key, options={})
+ def touch(key, options = nil)
policy = create_policy(options, WritePolicy)
command = TouchCommand.new(@cluster, policy, key)
execute_command(command)
end
@@ -258,52 +257,34 @@
#-------------------------------------------------------
##
# Determines if a record key exists.
# The policy can be used to specify timeouts.
- def exists(key, options={})
+ def exists(key, options = nil)
policy = create_policy(options, Policy)
command = ExistsCommand.new(@cluster, policy, key)
execute_command(command)
command.exists
end
- # Check if multiple record keys exist in one batch call.
- # The returned array bool is in positional order with the original key array order.
- # The policy can be used to specify timeouts.
- def batch_exists(keys, options={})
- policy = create_policy(options, Policy)
-
- # same array can be used without sychronization;
- # when a key exists, the corresponding index will be marked true
- exists_array = Array.new(keys.length)
-
- key_map = BatchItem.generate_map(keys)
-
- batch_execute(keys) do |node, bns|
- BatchCommandExists.new(node, bns, policy, key_map, exists_array)
- end
- exists_array
- end
-
#-------------------------------------------------------
# Read Record Operations
#-------------------------------------------------------
# Read record header and bins for specified key.
# The policy can be used to specify timeouts.
- def get(key, bin_names=[], options={})
+ def get(key, bin_names = nil, options = nil)
policy = create_policy(options, Policy)
command = ReadCommand.new(@cluster, policy, key, bin_names)
execute_command(command)
command.record
end
# Read record generation and expiration only for specified key. Bins are not read.
# The policy can be used to specify timeouts.
- def get_header(key, options={})
+ def get_header(key, options = nil)
policy = create_policy(options, Policy)
command = ReadHeaderCommand.new(@cluster, policy, key)
execute_command(command)
command.record
end
@@ -313,63 +294,77 @@
#-------------------------------------------------------
# Read multiple record headers and bins for specified keys in one batch call.
# The returned records are in positional order with the original key array order.
# If a key is not found, the positional record will be nil.
- # The policy can be used to specify timeouts.
- def batch_get(keys, bin_names=[], options={})
- policy = create_policy(options, Policy)
+ # The policy can be used to specify timeouts and protocol type.
+ def batch_get(keys, bin_names = nil, options = nil)
+ policy = create_policy(options, BatchPolicy)
+ results = Array.new(keys.length)
+ info_flags = INFO1_READ
- # wait until all migrations are finished
- # TODO: implement
- # @cluster.WaitUntillMigrationIsFinished(policy.timeout)
+ case bin_names
+ when :all, nil, []
+ info_flags |= INFO1_GET_ALL
+ bin_names = nil
+ when :none
+ info_flags |= INFO1_NOBINDATA
+ bin_names = nil
+ end
- # same array can be used without sychronization;
- # when a key exists, the corresponding index will be set to record
- records = Array.new(keys.length)
-
- key_map = BatchItem.generate_map(keys)
-
- batch_execute(keys) do |node, bns|
- BatchCommandGet.new(node, bns, policy, key_map, bin_names.uniq, records, INFO1_READ)
+ if policy.use_batch_direct
+ key_map = BatchItem.generate_map(keys)
+ execute_batch_direct_commands(keys) do |node, batch|
+ BatchDirectCommand.new(node, batch, policy, key_map, bin_names, results, info_flags)
+ end
+ else
+ execute_batch_index_commands(keys) do |node, batch|
+ BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags)
+ end
end
- records
+
+ results
end
# Read multiple record header data for specified keys in one batch call.
# The returned records are in positional order with the original key array order.
# If a key is not found, the positional record will be nil.
- # The policy can be used to specify timeouts.
- def batch_get_header(keys, options={})
- policy = create_policy(options, Policy)
+ # The policy can be used to specify timeouts and protocol type.
+ def batch_get_header(keys, options = nil)
+ batch_get(keys, :none, options)
+ end
- # wait until all migrations are finished
- # TODO: Fix this and implement
- # @cluster.WaitUntillMigrationIsFinished(policy.timeout)
+ # Check if multiple record keys exist in one batch call.
+ # The returned boolean array is in positional order with the original key array order.
+ # The policy can be used to specify timeouts and protocol type.
+ def batch_exists(keys, options = nil)
+ policy = create_policy(options, BatchPolicy)
+ results = Array.new(keys.length)
- # same array can be used without sychronization;
- # when a key exists, the corresponding index will be set to record
- records = Array.new(keys.length)
-
- key_map = BatchItem.generate_map(keys)
-
- batch_execute(keys) do |node, bns|
- BatchCommandGet.new(node, bns, policy, key_map, nil, records, INFO1_READ | INFO1_NOBINDATA)
+ if policy.use_batch_direct
+ key_map = BatchItem.generate_map(keys)
+ execute_batch_direct_commands(keys) do |node, batch|
+ BatchDirectExistsCommand.new(node, batch, policy, key_map, results)
+ end
+ else
+ execute_batch_index_commands(keys) do |node, batch|
+ BatchIndexExistsCommand.new(node, batch, policy, results)
+ end
end
- records
+ results
end
#-------------------------------------------------------
# Generic Database Operations
#-------------------------------------------------------
# Perform multiple read/write operations on a single key in one batch call.
# An example would be to add an integer value to an existing record and then
# read the result, all in one database call. Operations are executed in
# the order they are specified.
- def operate(key, operations, options={})
+ def operate(key, operations, options = nil)
policy = create_policy(options, WritePolicy)
command = OperateCommand.new(@cluster, policy, key, operations)
execute_command(command)
command.record
@@ -383,22 +378,22 @@
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
# RegisterTask instance.
#
# This method is only supported by Aerospike 3 servers.
- def register_udf_from_file(client_path, server_path, language, options={})
+ def register_udf_from_file(client_path, server_path, language, options = nil)
udf_body = File.read(client_path)
register_udf(udf_body, server_path, language, options)
end
# Register package containing user defined functions with server.
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
# RegisterTask instance.
#
# This method is only supported by Aerospike 3 servers.
- def register_udf(udf_body, server_path, language, options={})
+ def register_udf(udf_body, server_path, language, options = nil)
content = Base64.strict_encode64(udf_body).force_encoding('binary')
str_cmd = "udf-put:filename=#{server_path};content=#{content};"
str_cmd << "content-len=#{content.length};udf-type=#{language};"
# Send UDF to one node. That node will distribute the UDF to other nodes.
@@ -424,11 +419,11 @@
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
# RemoveTask instance.
#
# This method is only supported by Aerospike 3 servers.
- def remove_udf(udf_name, options={})
+ def remove_udf(udf_name, options = nil)
str_cmd = "udf-remove:filename=#{udf_name};"
# Send command to one node. That node will distribute it to other nodes.
# Send UDF to one node. That node will distribute the UDF to other nodes.
response_map = @cluster.request_info(@default_policy, str_cmd)
@@ -441,11 +436,11 @@
end
end
# ListUDF lists all packages containing user defined functions in the server.
# This method is only supported by Aerospike 3 servers.
- def list_udf(options={})
+ def list_udf(options = nil)
str_cmd = 'udf-list'
# Send command to one node. That node will distribute it to other nodes.
response_map = @cluster.request_info(@default_policy, str_cmd)
_, response = response_map.first
@@ -477,11 +472,11 @@
# The package name is used to locate the udf file location:
#
# udf file = <server udf dir>/<package name>.lua
#
# This method is only supported by Aerospike 3 servers.
- def execute_udf(key, package_name, function_name, args=[], options={})
+ def execute_udf(key, package_name, function_name, args=[], options = nil)
policy = create_policy(options, WritePolicy)
command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args)
execute_command(command)
@@ -506,11 +501,11 @@
# The user can optionally wait for command completion by using the returned
# ExecuteTask instance.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, the default relevant policy will be used.
- def execute_udf_on_query(statement, package_name, function_name, function_args=[], options={})
+ def execute_udf_on_query(statement, package_name, function_name, function_args=[], options = nil)
policy = create_policy(options, QueryPolicy)
nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.")
@@ -543,11 +538,11 @@
# IndexTask instance.
#
# This method is only supported by Aerospike 3 servers.
# index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later)
# collection_type should be :list, :mapkeys or :mapvalues
- def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type=nil, options={})
+ def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type=nil, options = nil)
if options.nil? && collection_type.is_a?(Hash)
options, collection_type = collection_type, nil
end
policy = create_policy(options, WritePolicy)
str_cmd = "sindex-create:ns=#{namespace}"
@@ -572,11 +567,11 @@
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Create index failed: #{response}")
end
# Delete secondary index.
# This method is only supported by Aerospike 3 servers.
- def drop_index(namespace, set_name, index_name, options={})
+ def drop_index(namespace, set_name, index_name, options = nil)
policy = create_policy(options, WritePolicy)
str_cmd = "sindex-delete:ns=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
str_cmd << ";indexname=#{index_name}"
@@ -596,11 +591,11 @@
#-------------------------------------------------------
# Scan Operations
#-------------------------------------------------------
- def scan_all(namespace, set_name, bin_names=[], options={})
+ def scan_all(namespace, set_name, bin_names = nil, options = nil)
policy = create_policy(options, ScanPolicy)
# wait until all migrations are finished
# TODO: implement
# @cluster.WaitUntillMigrationIsFinished(policy.timeout)
@@ -652,11 +647,11 @@
recordset
end
# ScanNode reads all records in specified namespace and set, from one node only.
# The policy can be used to specify timeouts.
- def scan_node(node, namespace, set_name, bin_names=[], options={})
+ def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
policy = create_policy(options, ScanPolicy)
# wait until all migrations are finished
# TODO: implement
# @cluster.WaitUntillMigrationIsFinished(policy.timeout)
@@ -694,11 +689,11 @@
# The caller can concurrently pops records off the channel through the
# record channel.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, a default policy will be generated.
- def query(statement, options={})
+ def query(statement, options = nil)
policy = create_policy(options, QueryPolicy)
new_policy = policy.clone
nodes = @cluster.nodes
if nodes.empty?
@@ -730,26 +725,26 @@
# User administration
#-------------------------------------------------------
# Create user with password and roles. Clear-text password will be hashed using bcrypt
# before sending to server.
- def create_user(user, password, roles, options={})
+ def create_user(user, password, roles, options = nil)
policy = create_policy(options, AdminPolicy)
hash = AdminCommand.hash_password(password)
command = AdminCommand.new
command.create_user(@cluster, policy, user, hash, roles)
end
# Remove user from cluster.
- def drop_user(user, options={})
+ def drop_user(user, options = nil)
policy = create_policy(options, AdminPolicy)
command = AdminCommand.new
command.drop_user(@cluster, policy, user)
end
# Change user's password. Clear-text password will be hashed using bcrypt before sending to server.
- def change_password(user, password, options={})
+ def change_password(user, password, options = nil)
raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != ""
policy = create_policy(options, AdminPolicy)
hash = AdminCommand.hash_password(password)
command = AdminCommand.new
@@ -764,32 +759,32 @@
@cluster.change_password(user, hash)
end
# Add roles to user's list of roles.
- def grant_roles(user, roles, options={})
+ def grant_roles(user, roles, options = nil)
policy = create_policy(options, AdminPolicy)
command = AdminCommand.new
command.grant_roles(@cluster, policy, user, roles)
end
# Remove roles from user's list of roles.
- def revoke_roles(user, roles, options={})
+ def revoke_roles(user, roles, options = nil)
policy = create_policy(options, AdminPolicy)
command = AdminCommand.new
command.revoke_roles(@cluster, policy, user, roles)
end
# Retrieve roles for a given user.
- def query_user(user, options={})
+ def query_user(user, options = nil)
policy = create_policy(options, AdminPolicy)
command = AdminCommand.new
command.query_user(@cluster, policy, user)
end
# Retrieve all users and their roles.
- def query_users(options={})
+ def query_users(options = nil)
policy = create_policy(options, AdminPolicy)
command = AdminCommand.new
command.query_users(@cluster, policy)
end
@@ -860,21 +855,44 @@
def execute_command(command)
validate_command(command)
command.execute
end
- def batch_execute(keys)
- batch_nodes = BatchNode.generate_list(@cluster, keys)
+ def execute_batch_index_commands(keys)
+ if @cluster.nodes.empty?
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.")
+ end
+
+ batch_nodes = BatchIndexNode.generate_list(@cluster, keys)
threads = []
+ batch_nodes.each do |batch|
+ threads << Thread.new do
+ Thread.current.abort_on_exception = true
+ command = yield batch.node, batch
+ execute_command(command)
+ end
+ end
+
+ threads.each(&:join)
+ end
+
+ def execute_batch_direct_commands(keys)
+ if @cluster.nodes.empty?
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Direct command failed because cluster is empty.")
+ end
+
+ batch_nodes = BatchDirectNode.generate_list(@cluster, keys)
+ threads = []
+
# Use a thread per namespace per node
batch_nodes.each do |batch_node|
# copy to avoid race condition
bn = batch_node
- bn.batch_namespaces.each do |bns|
+ bn.batch_namespaces.each do |batch|
threads << Thread.new do
Thread.current.abort_on_exception = true
- command = yield bn.node, bns
+ command = yield batch_node.node, batch
execute_command(command)
end
end
end