lib/aerospike/client.rb in aerospike-2.8.0 vs lib/aerospike/client.rb in aerospike-2.9.0
- old
+ new
@@ -35,22 +35,23 @@
# #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and
# +:fail_if_not_connected+ set to true
class Client
- attr_accessor :default_policy, :default_write_policy,
- :default_scan_policy, :default_query_policy, :default_admin_policy
+ attr_accessor :default_admin_policy
+ attr_accessor :default_batch_policy
+ attr_accessor :default_info_policy
+ attr_accessor :default_query_policy
+ attr_accessor :default_read_policy
+ attr_accessor :default_scan_policy
+ attr_accessor :default_write_policy
def initialize(hosts = nil, policy: ClientPolicy.new, connect: true)
- @default_policy = Policy.new
- @default_write_policy = WritePolicy.new
- @default_scan_policy = ScanPolicy.new
- @default_query_policy = QueryPolicy.new
- @default_admin_policy = QueryPolicy.new
hosts = ::Aerospike::Host::Parse.(hosts || ENV['AEROSPIKE_HOSTS'] || 'localhost')
policy = create_policy(policy, ClientPolicy)
+ set_default_policies(policy.policies)
@cluster = Cluster.new(policy, hosts)
@cluster.add_cluster_config_change_listener(self)
self.connect if connect
self
@@ -110,11 +111,11 @@
# Examples:
#
# client.put key, {'bin', 'value string'}, :timeout => 0.001
def put(key, bins, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE)
execute_command(command)
end
#-------------------------------------------------------
@@ -133,11 +134,11 @@
# Examples:
#
# client.append key, {'bin', 'value to append'}, :timeout => 0.001
def append(key, bins, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND)
execute_command(command)
end
##
@@ -152,11 +153,11 @@
# Examples:
#
# client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001
def prepend(key, bins, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND)
execute_command(command)
end
#-------------------------------------------------------
@@ -175,11 +176,11 @@
# Examples:
#
# client.add key, {'bin', -1}, :timeout => 0.001
def add(key, bins, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD)
execute_command(command)
end
#-------------------------------------------------------
@@ -197,11 +198,11 @@
# Examples:
#
# existed = client.delete key, :timeout => 0.001
def delete(key, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = DeleteCommand.new(@cluster, policy, key)
execute_command(command)
command.existed
end
@@ -219,15 +220,23 @@
# cut-off (set at the time of the truncate call.)
#
# If no policy options are provided, +@default_info_policy+ will be used.
def truncate(namespace, set_name = nil, before_last_update = nil, options = {})
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, Policy, default_info_policy)
+
str_cmd = "truncate:namespace=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
- str_cmd << ";lut=#{(before_last_update.to_f * 1_000_000_000.0).round}" if before_last_update
+ if before_last_update
+ lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round
+ str_cmd << ";lut=#{lut_nanos}"
+ elsif supports_feature?(Aerospike::Features::LUT_NOW)
+ # Servers >= 4.3.1.4 require lut argument
+ str_cmd << ";lut=now"
+ end
+
# Send index command to one node. That node will distribute the command to other nodes.
response = send_info_command(policy, str_cmd).upcase
return if response == 'OK'
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}")
end
@@ -245,11 +254,11 @@
# Examples:
#
# client.touch key, :timeout => 0.001
def touch(key, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = TouchCommand.new(@cluster, policy, key)
execute_command(command)
end
#-------------------------------------------------------
@@ -258,11 +267,11 @@
##
# Determines if a record key exists.
# The policy can be used to specify timeouts.
def exists(key, options = nil)
- policy = create_policy(options, Policy)
+ policy = create_policy(options, Policy, default_read_policy)
command = ExistsCommand.new(@cluster, policy, key)
execute_command(command)
command.exists
end
@@ -271,21 +280,21 @@
#-------------------------------------------------------
# Read record header and bins for specified key.
# The policy can be used to specify timeouts.
def get(key, bin_names = nil, options = nil)
- policy = create_policy(options, Policy)
+ policy = create_policy(options, Policy, default_read_policy)
command = ReadCommand.new(@cluster, policy, key, bin_names)
execute_command(command)
command.record
end
# Read record generation and expiration only for specified key. Bins are not read.
# The policy can be used to specify timeouts.
def get_header(key, options = nil)
- policy = create_policy(options, Policy)
+ policy = create_policy(options, Policy, default_read_policy)
command = ReadHeaderCommand.new(@cluster, policy, key)
execute_command(command)
command.record
end
@@ -296,11 +305,11 @@
# Read multiple record headers and bins for specified keys in one batch call.
# The returned records are in positional order with the original key array order.
# If a key is not found, the positional record will be nil.
# The policy can be used to specify timeouts and protocol type.
def batch_get(keys, bin_names = nil, options = nil)
- policy = create_policy(options, BatchPolicy)
+ policy = create_policy(options, BatchPolicy, default_batch_policy)
results = Array.new(keys.length)
info_flags = INFO1_READ
case bin_names
when :all, nil, []
@@ -335,11 +344,11 @@
# Check if multiple record keys exist in one batch call.
# The returned boolean array is in positional order with the original key array order.
# The policy can be used to specify timeouts and protocol type.
def batch_exists(keys, options = nil)
- policy = create_policy(options, BatchPolicy)
+ policy = create_policy(options, BatchPolicy, default_batch_policy)
results = Array.new(keys.length)
if policy.use_batch_direct
key_map = BatchItem.generate_map(keys)
execute_batch_direct_commands(keys) do |node, batch|
@@ -361,11 +370,11 @@
# Perform multiple read/write operations on a single key in one batch call.
# An example would be to add an integer value to an existing record and then
# read the result, all in one database call. Operations are executed in
# the order they are specified.
def operate(key, operations, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = OperateCommand.new(@cluster, policy, key, operations)
execute_command(command)
command.record
end
@@ -390,16 +399,18 @@
# The user can optionally wait for command completion by using the returned
# RegisterTask instance.
#
# This method is only supported by Aerospike 3 servers.
def register_udf(udf_body, server_path, language, options = nil)
- content = Base64.strict_encode64(udf_body).force_encoding('binary')
+ policy = create_policy(options, Policy, default_info_policy)
+ content = Base64.strict_encode64(udf_body).force_encoding('binary')
str_cmd = "udf-put:filename=#{server_path};content=#{content};"
str_cmd << "content-len=#{content.length};udf-type=#{language};"
+
# Send UDF to one node. That node will distribute the UDF to other nodes.
- response_map = @cluster.request_info(@default_policy, str_cmd)
+ response_map = @cluster.request_info(policy, str_cmd)
res = {}
response_map.each do |k, response|
vals = response.to_s.split(';')
vals.each do |pair|
@@ -420,15 +431,17 @@
# The user can optionally wait for command completion by using the returned
# RemoveTask instance.
#
# This method is only supported by Aerospike 3 servers.
def remove_udf(udf_name, options = nil)
+ policy = create_policy(options, Policy, default_info_policy)
+
str_cmd = "udf-remove:filename=#{udf_name};"
# Send command to one node. That node will distribute it to other nodes.
# Send UDF to one node. That node will distribute the UDF to other nodes.
- response_map = @cluster.request_info(@default_policy, str_cmd)
+ response_map = @cluster.request_info(policy, str_cmd)
_, response = response_map.first
if response == 'ok'
UdfRemoveTask.new(@cluster, udf_name)
else
@@ -437,14 +450,16 @@
end
# ListUDF lists all packages containing user defined functions in the server.
# This method is only supported by Aerospike 3 servers.
def list_udf(options = nil)
+ policy = create_policy(options, Policy, default_info_policy)
+
str_cmd = 'udf-list'
# Send command to one node. That node will distribute it to other nodes.
- response_map = @cluster.request_info(@default_policy, str_cmd)
+ response_map = @cluster.request_info(policy, str_cmd)
_, response = response_map.first
vals = response.split(';')
vals.map do |udf_info|
@@ -473,11 +488,11 @@
#
# udf file = <server udf dir>/<package name>.lua
#
# This method is only supported by Aerospike 3 servers.
def execute_udf(key, package_name, function_name, args=[], options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, WritePolicy, default_write_policy)
command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args)
execute_command(command)
record = command.record
@@ -502,11 +517,11 @@
# ExecuteTask instance.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, the default relevant policy will be used.
def execute_udf_on_query(statement, package_name, function_name, function_args=[], options = nil)
- policy = create_policy(options, QueryPolicy)
+ policy = create_policy(options, QueryPolicy, default_query_policy)
nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.")
end
@@ -538,15 +553,16 @@
# IndexTask instance.
#
# This method is only supported by Aerospike 3 servers.
# index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later)
# collection_type should be :list, :mapkeys or :mapvalues
- def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type=nil, options = nil)
+ def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil)
if options.nil? && collection_type.is_a?(Hash)
options, collection_type = collection_type, nil
end
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, Policy, default_info_policy)
+
str_cmd = "sindex-create:ns=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
str_cmd << ";indexname=#{index_name};numbins=1"
str_cmd << ";indextype=#{collection_type.to_s.upcase}" if collection_type
str_cmd << ";indexdata=#{bin_name},#{index_type.to_s.upcase}"
@@ -568,11 +584,12 @@
end
# Delete secondary index.
# This method is only supported by Aerospike 3 servers.
def drop_index(namespace, set_name, index_name, options = nil)
- policy = create_policy(options, WritePolicy)
+ policy = create_policy(options, Policy, default_info_policy)
+
str_cmd = "sindex-delete:ns=#{namespace}"
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
str_cmd << ";indexname=#{index_name}"
# Send index command to one node. That node will distribute the command to other nodes.
@@ -583,20 +600,21 @@
return if response.start_with?('FAIL:201')
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Drop index failed: #{response}")
end
- def request_info(*commands)
- @cluster.request_info(@default_policy, *commands)
+ def request_info(*commands, policy: nil)
+ policy = create_policy(policy, Policy, default_info_policy)
+ @cluster.request_info(policy, *commands)
end
#-------------------------------------------------------
# Scan Operations
#-------------------------------------------------------
def scan_all(namespace, set_name, bin_names = nil, options = nil)
- policy = create_policy(options, ScanPolicy)
+ policy = create_policy(options, ScanPolicy, default_scan_policy)
# wait until all migrations are finished
# TODO: implement
# @cluster.WaitUntillMigrationIsFinished(policy.timeout)
@@ -648,11 +666,11 @@
end
# ScanNode reads all records in specified namespace and set, from one node only.
# The policy can be used to specify timeouts.
def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
- policy = create_policy(options, ScanPolicy)
+ policy = create_policy(options, ScanPolicy, default_scan_policy)
# wait until all migrations are finished
# TODO: implement
# @cluster.WaitUntillMigrationIsFinished(policy.timeout)
# Retry policy must be one-shot for scans.
@@ -690,11 +708,11 @@
# record channel.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, a default policy will be generated.
def query(statement, options = nil)
- policy = create_policy(options, QueryPolicy)
+ policy = create_policy(options, QueryPolicy, default_query_policy)
new_policy = policy.clone
nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
@@ -726,27 +744,27 @@
#-------------------------------------------------------
# Create user with password and roles. Clear-text password will be hashed using bcrypt
# before sending to server.
def create_user(user, password, roles, options = nil)
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
hash = AdminCommand.hash_password(password)
command = AdminCommand.new
command.create_user(@cluster, policy, user, hash, roles)
end
# Remove user from cluster.
def drop_user(user, options = nil)
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
command = AdminCommand.new
command.drop_user(@cluster, policy, user)
end
# Change user's password. Clear-text password will be hashed using bcrypt before sending to server.
def change_password(user, password, options = nil)
raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != ""
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
hash = AdminCommand.hash_password(password)
command = AdminCommand.new
if user == @cluster.user
@@ -760,40 +778,49 @@
@cluster.change_password(user, hash)
end
# Add roles to user's list of roles.
def grant_roles(user, roles, options = nil)
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
command = AdminCommand.new
command.grant_roles(@cluster, policy, user, roles)
end
# Remove roles from user's list of roles.
def revoke_roles(user, roles, options = nil)
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
command = AdminCommand.new
command.revoke_roles(@cluster, policy, user, roles)
end
# Retrieve roles for a given user.
def query_user(user, options = nil)
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
command = AdminCommand.new
command.query_user(@cluster, policy, user)
end
# Retrieve all users and their roles.
def query_users(options = nil)
- policy = create_policy(options, AdminPolicy)
+ policy = create_policy(options, AdminPolicy, default_admin_policy)
command = AdminCommand.new
command.query_users(@cluster, policy)
end
private
+ def set_default_policies(policies)
+ self.default_info_policy = create_policy(policies[:info], Policy)
+ self.default_read_policy = create_policy(policies[:read], Policy)
+ self.default_admin_policy = create_policy(policies[:admin], AdminPolicy)
+ self.default_batch_policy = create_policy(policies[:batch], BatchPolicy)
+ self.default_query_policy = create_policy(policies[:query], QueryPolicy)
+ self.default_scan_policy = create_policy(policies[:scan], ScanPolicy)
+ self.default_write_policy = create_policy(policies[:write], WritePolicy)
+ end
+
def send_info_command(policy, command)
- policy ||= default_policy
Aerospike.logger.debug { "Sending info command: #{command}" }
_, response = @cluster.request_info(policy, command).first
response.to_s
end
@@ -808,14 +835,14 @@
Bin.new(k, v)
end
end
end
- def create_policy(policy, policy_klass)
+ def create_policy(policy, policy_klass, default_policy = nil)
case policy
when nil
- policy_klass.new
+ default_policy || policy_klass.new
when policy_klass
policy
when Hash
policy_klass.new(policy)
else
@@ -836,11 +863,11 @@
Aerospike.logger.debug { "Cluster features: #{@cluster.features.get.to_a}" }
validators = []
# guard against unsupported particle types
unsupported_particle_types = []
- unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless @cluster.supports_feature?("float")
- unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless @cluster.supports_feature?("geo")
+ unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless supports_feature?(Aerospike::Features::FLOAT)
+ unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless supports_feature?(Aerospike::Features::GEO)
validators << UnsupportedParticleTypeValidator.new(*unsupported_particle_types) unless unsupported_particle_types.empty?
@command_validators = validators
end