lib/aerospike/client.rb in aerospike-0.1.6 vs lib/aerospike/client.rb in aerospike-1.0.0

- old
+ new

@@ -34,17 +34,19 @@ # #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and # +:fail_if_not_connected+ set to true class Client - attr_accessor :default_policy, :default_write_policy + attr_accessor :default_policy, :default_write_policy, + :default_scan_policy, :default_query_policy, :default_admin_policy def initialize(host, port, options={}) @default_policy = Policy.new @default_write_policy = WritePolicy.new @default_scan_policy = ScanPolicy.new @default_query_policy = QueryPolicy.new + @default_admin_policy = QueryPolicy.new policy = opt_to_client_policy(options) @cluster = Cluster.new(policy, Host.new(host, port)) @@ -592,11 +594,11 @@ abort_on_exception = true command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) begin command.execute rescue => e - Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION + Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished end end @@ -607,11 +609,11 @@ nodes.each do |node| command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) begin command.execute rescue => e - Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION + Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished end end @@ -642,11 +644,11 @@ abort_on_exception = true command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) begin command.execute rescue => e - Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION + Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished end end @@ -682,21 +684,97 @@ abort_on_exception = true command = QueryCommand.new(node, new_policy, statement, recordset) begin command.execute rescue => e - Aerospike.logger.error(e) unless e == Rescordset::QUERY_TERMINATED_EXCEPTION + Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION recordset.cancel(e) ensure recordset.thread_finished end end end recordset end + #------------------------------------------------------- + # User administration + #------------------------------------------------------- + + # Create user with password and roles. Clear-text password will be hashed using bcrypt + # before sending to server. + def create_user(user, password, roles, options={}) + policy = opt_to_admin_policy(options) + hash = AdminCommand.hash_password(password) + command = AdminCommand.new + command.create_user(@cluster, policy, user, hash, roles) + end + + # Remove user from cluster. + def drop_user(user, options={}) + policy = opt_to_admin_policy(options) + command = AdminCommand.new + command.drop_user(@cluster, policy, user) + end + + # Change user's password. Clear-text password will be hashed using bcrypt before sending to server. + def change_password(user, password, options={}) + policy = opt_to_admin_policy(options) + if @cluster.user == '' + return NewAerospikeError(INVALID_USER) + end + + hash = AdminCommand.hash_password(password) + command = AdminCommand.new + + if user == @cluster.user + # Change own password. + command.change_password(@cluster, policy, user, hash) + else + # Change other user's password by user admin. + command.set_password(@cluster, policy, user, hash) + end + + @cluster.change_password(user, hash) + end + + # Add roles to user's list of roles. + def grant_roles(user, roles, options={}) + policy = opt_to_admin_policy(options) + command = AdminCommand.new + command.grant_roles(@cluster, policy, user, roles) + end + + # Remove roles from user's list of roles. + def revoke_roles(user, roles, options={}) + policy = opt_to_admin_policy(options) + command = AdminCommand.new + command.revoke_roles(@cluster, policy, user, roles) + end + + # Replace user's list of roles. + def replace_roles(user, roles, options={}) + policy = opt_to_admin_policy(options) + command = AdminCommand.new + command.replace_roles(@cluster, policy, user, roles) + end + + # Retrieve roles for a given user. + def query_user(user, options={}) + policy = opt_to_admin_policy(options) + command = AdminCommand.new + command.query_user(@cluster, policy, user) + end + + # Retrieve all users and their roles. + def query_users(options={}) + policy = opt_to_admin_policy(options) + command = AdminCommand.new + command.query_users(@cluster, policy) + end + private def send_info_command(policy, command) @cluster.request_info(@default_policy, command) end @@ -718,72 +796,60 @@ if options.nil? || options == {} ClientPolicy.new elsif options.is_a?(ClientPolicy) options elsif options.is_a?(Hash) - ClientPolicy.new( - options[:timeout], - options[:connection_queue_size], - options[:fail_if_not_connected] - ) + ClientPolicy.new(options) end end def opt_to_policy(options) if options.nil? || options == {} @default_policy elsif options.is_a?(Policy) options elsif options.is_a?(Hash) - Policy.new( - options[:priority], - options[:timeout], - options[:max_retiries], - options[:sleep_between_retries], - options[:consistency_level] - ) + Policy.new(options) end end def opt_to_write_policy(options) if options.nil? || options == {} @default_write_policy elsif options.is_a?(WritePolicy) options elsif options.is_a?(Hash) - WritePolicy.new( - options[:record_exists_action], - options[:gen_policy], - options[:generation], - options[:expiration], - options[:send_key], - options[:commit_level] - ) + WritePolicy.new(options) end end def opt_to_scan_policy(options) if options.nil? || options == {} @default_scan_policy elsif options.is_a?(ScanPolicy) options elsif options.is_a?(Hash) - ScanPolicy.new( - options[:scan_percent], - options[:concurrent_nodes], - options[:include_bin_data], - options[:fail_on_cluster_change] - ) + ScanPolicy.new(options) end end def opt_to_query_policy(options) if options.nil? || options == {} @default_query_policy elsif options.is_a?(QueryPolicy) options elsif options.is_a?(Hash) - QueryPolicy.new() + QueryPolicy.new(options) + end + end + + def opt_to_admin_policy(options) + if options.nil? || options == {} + @default_admin_policy + elsif options.is_a?(AdminPolicy) + options + elsif options.is_a?(Hash) + AdminPolicy.new(options) end end def batch_execute(keys, &cmd_gen) batch_nodes = BatchNode.generate_list(@cluster, keys)