lib/aerospike/client.rb in aerospike-2.1.1 vs lib/aerospike/client.rb in aerospike-2.2.0

- old
+ new

@@ -86,13 +86,11 @@ ## # Returns list of active server node names in the cluster. def node_names - @cluster.nodes.map do |node| - node.get_name - end + @cluster.nodes.map(&:get_name) end def supports_feature?(feature) @cluster.supports_feature?(feature) end @@ -250,15 +248,13 @@ # when a key exists, the corresponding index will be marked true exists_array = Array.new(keys.length) key_map = BatchItem.generate_map(keys) - cmd_gen = Proc.new do |node, bns| + batch_execute(keys) do |node, bns| BatchCommandExists.new(node, bns, policy, key_map, exists_array) end - - batch_execute(keys, &cmd_gen) exists_array end #------------------------------------------------------- # Read Record Operations @@ -302,15 +298,13 @@ # when a key exists, the corresponding index will be set to record records = Array.new(keys.length) key_map = BatchItem.generate_map(keys) - cmd_gen = Proc.new do |node, bns| + batch_execute(keys) do |node, bns| BatchCommandGet.new(node, bns, policy, key_map, bin_names.uniq, records, INFO1_READ) end - - batch_execute(keys, &cmd_gen) records end # Read multiple record header data for specified keys in one batch call. # The returned records are in positional order with the original key array order. @@ -327,28 +321,25 @@ # when a key exists, the corresponding index will be set to record records = Array.new(keys.length) key_map = BatchItem.generate_map(keys) - cmd_gen = Proc.new do |node, bns| + batch_execute(keys) do |node, bns| BatchCommandGet.new(node, bns, policy, key_map, nil, records, INFO1_READ | INFO1_NOBINDATA) end - batch_execute(keys, &cmd_gen) records end #------------------------------------------------------- # Generic Database Operations #------------------------------------------------------- # Perform multiple read/write operations on a single key in one batch call. # An example would be to add an integer value to an existing record and then - # read the result, all in one database call. - # - # Write operations are always performed first, regardless of operation order - # relative to read operations. + # read the result, all in one database call. Operations are executed in + # the order they are specified. def operate(key, operations, options={}) policy = create_policy(options, WritePolicy) command = OperateCommand.new(@cluster, policy, key, operations) execute_command(command) @@ -408,11 +399,11 @@ # RegisterTask instance. # # This method is only supported by Aerospike 3 servers. def register_udf_from_file(client_path, server_path, language, options={}) udf_body = File.read(client_path) - register_udf(udf_body, server_path, language, options={}) + register_udf(udf_body, server_path, language, options) end # Register package containing user defined functions with server. # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned @@ -508,26 +499,21 @@ command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args) execute_command(command) record = command.record - return nil if !record || record.bins.length == 0 + return nil if !record || record.bins.empty? result_map = record.bins # User defined functions don't have to return a value. - key, obj = result_map.detect{|k, v| k.include?('SUCCESS')} - if key - return obj - end + key, obj = result_map.detect{ |k, _| k.include?('SUCCESS') } + return obj if key - key, obj = result_map.detect{|k, v| k.include?('FAILURE')} - if key - raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "#{obj}") - end - - raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "Invalid UDF return value") + key, obj = result_map.detect{ |k, _| k.include?('FAILURE') } + message = key ? obj.to_s : "Invalid UDF return value" + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::UDF_BAD_RESPONSE, message) end # execute_udf_on_query applies user defined function on records that match the statement filter. # Records are not returned to the client. # This asynchronous server call will return before command is complete. @@ -538,11 +524,11 @@ # If the policy is nil, the default relevant policy will be used. def execute_udf_on_query(statement, package_name, function_name, function_args=[], options={}) policy = create_policy(options, QueryPolicy) nodes = @cluster.nodes - if nodes.length == 0 + if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.") end # TODO: wait until all migrations are finished statement.set_aggregate_function(package_name, function_name, function_args, false) @@ -642,11 +628,11 @@ # Retry policy must be one-shot for scans. # copy on write for policy new_policy = policy.clone nodes = @cluster.nodes - if nodes.length == 0 + if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") end recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan) @@ -697,11 +683,11 @@ # Retry policy must be one-shot for scans. # copy on write for policy new_policy = policy.clone new_policy.max_retries = 0 - node = @cluster.get_node_by_name(node) if !node.is_a?(Aerospike::Node) + node = @cluster.get_node_by_name(node) unless node.is_a?(Aerospike::Node) recordset = Recordset.new(policy.record_queue_size, 1, :scan) Thread.new do Thread.current.abort_on_exception = true @@ -733,11 +719,11 @@ def query(statement, options={}) policy = create_policy(options, QueryPolicy) new_policy = policy.clone nodes = @cluster.nodes - if nodes.length == 0 + if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") end recordset = Recordset.new(policy.record_queue_size, nodes.length, :query) @@ -780,14 +766,12 @@ command.drop_user(@cluster, policy, user) end # Change user's password. Clear-text password will be hashed using bcrypt before sending to server. def change_password(user, password, options={}) + raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != "" policy = create_policy(options, AdminPolicy) - if @cluster.user == '' - return NewAerospikeError(INVALID_USER) - end hash = AdminCommand.hash_password(password) command = AdminCommand.new if user == @cluster.user @@ -830,11 +814,12 @@ end private def send_info_command(policy, command) - @cluster.request_info(@default_policy, command) + policy ||= default_policy + @cluster.request_info(policy, command) end def hash_to_bins(hash) if hash.is_a?(Bin) [hash] @@ -910,28 +895,28 @@ def execute_command(command) validate_command(command) command.execute end - def batch_execute(keys, &cmd_gen) + def batch_execute(keys) batch_nodes = BatchNode.generate_list(@cluster, keys) threads = [] # Use a thread per namespace per node batch_nodes.each do |batch_node| # copy to avoid race condition bn = batch_node bn.batch_namespaces.each do |bns| threads << Thread.new do Thread.current.abort_on_exception = true - command = cmd_gen.call(bn.node, bns) + command = yield bn.node, bns execute_command(command) end end end - threads.each { |thr| thr.join } + threads.each(&:join) end end # class -end #module +end # module