lib/aerospike/client.rb in aerospike-0.1.6 vs lib/aerospike/client.rb in aerospike-1.0.0
- old
+ new
@@ -34,17 +34,19 @@
# #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and
# +:fail_if_not_connected+ set to true
class Client
- attr_accessor :default_policy, :default_write_policy
+ attr_accessor :default_policy, :default_write_policy,
+ :default_scan_policy, :default_query_policy, :default_admin_policy
def initialize(host, port, options={})
@default_policy = Policy.new
@default_write_policy = WritePolicy.new
@default_scan_policy = ScanPolicy.new
@default_query_policy = QueryPolicy.new
+ @default_admin_policy = QueryPolicy.new
policy = opt_to_client_policy(options)
@cluster = Cluster.new(policy, Host.new(host, port))
@@ -592,11 +594,11 @@
abort_on_exception = true
command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
begin
command.execute
rescue => e
- Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION
+ Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
end
end
@@ -607,11 +609,11 @@
nodes.each do |node|
command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
begin
command.execute
rescue => e
- Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION
+ Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
end
end
@@ -642,11 +644,11 @@
abort_on_exception = true
command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
begin
command.execute
rescue => e
- Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION
+ Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
end
end
@@ -682,21 +684,97 @@
abort_on_exception = true
command = QueryCommand.new(node, new_policy, statement, recordset)
begin
command.execute
rescue => e
- Aerospike.logger.error(e) unless e == Rescordset::QUERY_TERMINATED_EXCEPTION
+ Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
recordset.cancel(e)
ensure
recordset.thread_finished
end
end
end
recordset
end
+ #-------------------------------------------------------
+ # User administration
+ #-------------------------------------------------------
+
+ # Create user with password and roles. Clear-text password will be hashed using bcrypt
+ # before sending to server.
+ def create_user(user, password, roles, options={})
+ policy = opt_to_admin_policy(options)
+ hash = AdminCommand.hash_password(password)
+ command = AdminCommand.new
+ command.create_user(@cluster, policy, user, hash, roles)
+ end
+
+ # Remove user from cluster.
+ def drop_user(user, options={})
+ policy = opt_to_admin_policy(options)
+ command = AdminCommand.new
+ command.drop_user(@cluster, policy, user)
+ end
+
+ # Change user's password. Clear-text password will be hashed using bcrypt before sending to server.
+ def change_password(user, password, options={})
+ policy = opt_to_admin_policy(options)
+ if @cluster.user == ''
+ return NewAerospikeError(INVALID_USER)
+ end
+
+ hash = AdminCommand.hash_password(password)
+ command = AdminCommand.new
+
+ if user == @cluster.user
+ # Change own password.
+ command.change_password(@cluster, policy, user, hash)
+ else
+ # Change other user's password by user admin.
+ command.set_password(@cluster, policy, user, hash)
+ end
+
+ @cluster.change_password(user, hash)
+ end
+
+ # Add roles to user's list of roles.
+ def grant_roles(user, roles, options={})
+ policy = opt_to_admin_policy(options)
+ command = AdminCommand.new
+ command.grant_roles(@cluster, policy, user, roles)
+ end
+
+ # Remove roles from user's list of roles.
+ def revoke_roles(user, roles, options={})
+ policy = opt_to_admin_policy(options)
+ command = AdminCommand.new
+ command.revoke_roles(@cluster, policy, user, roles)
+ end
+
+ # Replace user's list of roles.
+ def replace_roles(user, roles, options={})
+ policy = opt_to_admin_policy(options)
+ command = AdminCommand.new
+ command.replace_roles(@cluster, policy, user, roles)
+ end
+
+ # Retrieve roles for a given user.
+ def query_user(user, options={})
+ policy = opt_to_admin_policy(options)
+ command = AdminCommand.new
+ command.query_user(@cluster, policy, user)
+ end
+
+ # Retrieve all users and their roles.
+ def query_users(options={})
+ policy = opt_to_admin_policy(options)
+ command = AdminCommand.new
+ command.query_users(@cluster, policy)
+ end
+
private
def send_info_command(policy, command)
@cluster.request_info(@default_policy, command)
end
@@ -718,72 +796,60 @@
if options.nil? || options == {}
ClientPolicy.new
elsif options.is_a?(ClientPolicy)
options
elsif options.is_a?(Hash)
- ClientPolicy.new(
- options[:timeout],
- options[:connection_queue_size],
- options[:fail_if_not_connected]
- )
+ ClientPolicy.new(options)
end
end
def opt_to_policy(options)
if options.nil? || options == {}
@default_policy
elsif options.is_a?(Policy)
options
elsif options.is_a?(Hash)
- Policy.new(
- options[:priority],
- options[:timeout],
- options[:max_retiries],
- options[:sleep_between_retries],
- options[:consistency_level]
- )
+ Policy.new(options)
end
end
def opt_to_write_policy(options)
if options.nil? || options == {}
@default_write_policy
elsif options.is_a?(WritePolicy)
options
elsif options.is_a?(Hash)
- WritePolicy.new(
- options[:record_exists_action],
- options[:gen_policy],
- options[:generation],
- options[:expiration],
- options[:send_key],
- options[:commit_level]
- )
+ WritePolicy.new(options)
end
end
def opt_to_scan_policy(options)
if options.nil? || options == {}
@default_scan_policy
elsif options.is_a?(ScanPolicy)
options
elsif options.is_a?(Hash)
- ScanPolicy.new(
- options[:scan_percent],
- options[:concurrent_nodes],
- options[:include_bin_data],
- options[:fail_on_cluster_change]
- )
+ ScanPolicy.new(options)
end
end
def opt_to_query_policy(options)
if options.nil? || options == {}
@default_query_policy
elsif options.is_a?(QueryPolicy)
options
elsif options.is_a?(Hash)
- QueryPolicy.new()
+ QueryPolicy.new(options)
+ end
+ end
+
+ def opt_to_admin_policy(options)
+ if options.nil? || options == {}
+ @default_admin_policy
+ elsif options.is_a?(AdminPolicy)
+ options
+ elsif options.is_a?(Hash)
+ AdminPolicy.new(options)
end
end
def batch_execute(keys, &cmd_gen)
batch_nodes = BatchNode.generate_list(@cluster, keys)