lib/aerospike/client.rb in aerospike-2.1.1 vs lib/aerospike/client.rb in aerospike-2.2.0
- old
+ new
@@ -86,13 +86,11 @@
##
# Returns list of active server node names in the cluster.
def node_names
- @cluster.nodes.map do |node|
- node.get_name
- end
+ @cluster.nodes.map(&:get_name)
end
def supports_feature?(feature)
@cluster.supports_feature?(feature)
end
@@ -250,15 +248,13 @@
# when a key exists, the corresponding index will be marked true
exists_array = Array.new(keys.length)
key_map = BatchItem.generate_map(keys)
- cmd_gen = Proc.new do |node, bns|
+ batch_execute(keys) do |node, bns|
BatchCommandExists.new(node, bns, policy, key_map, exists_array)
end
-
- batch_execute(keys, &cmd_gen)
exists_array
end
#-------------------------------------------------------
# Read Record Operations
@@ -302,15 +298,13 @@
# when a key exists, the corresponding index will be set to record
records = Array.new(keys.length)
key_map = BatchItem.generate_map(keys)
- cmd_gen = Proc.new do |node, bns|
+ batch_execute(keys) do |node, bns|
BatchCommandGet.new(node, bns, policy, key_map, bin_names.uniq, records, INFO1_READ)
end
-
- batch_execute(keys, &cmd_gen)
records
end
# Read multiple record header data for specified keys in one batch call.
# The returned records are in positional order with the original key array order.
@@ -327,28 +321,25 @@
# when a key exists, the corresponding index will be set to record
records = Array.new(keys.length)
key_map = BatchItem.generate_map(keys)
- cmd_gen = Proc.new do |node, bns|
+ batch_execute(keys) do |node, bns|
BatchCommandGet.new(node, bns, policy, key_map, nil, records, INFO1_READ | INFO1_NOBINDATA)
end
- batch_execute(keys, &cmd_gen)
records
end
#-------------------------------------------------------
# Generic Database Operations
#-------------------------------------------------------
# Perform multiple read/write operations on a single key in one batch call.
# An example would be to add an integer value to an existing record and then
- # read the result, all in one database call.
- #
- # Write operations are always performed first, regardless of operation order
- # relative to read operations.
+ # read the result, all in one database call. Operations are executed in
+ # the order they are specified.
def operate(key, operations, options={})
policy = create_policy(options, WritePolicy)
command = OperateCommand.new(@cluster, policy, key, operations)
execute_command(command)
@@ -408,11 +399,11 @@
# RegisterTask instance.
#
# This method is only supported by Aerospike 3 servers.
def register_udf_from_file(client_path, server_path, language, options={})
udf_body = File.read(client_path)
- register_udf(udf_body, server_path, language, options={})
+ register_udf(udf_body, server_path, language, options)
end
# Register package containing user defined functions with server.
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
@@ -508,26 +499,21 @@
command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args)
execute_command(command)
record = command.record
- return nil if !record || record.bins.length == 0
+ return nil if !record || record.bins.empty?
result_map = record.bins
# User defined functions don't have to return a value.
- key, obj = result_map.detect{|k, v| k.include?('SUCCESS')}
- if key
- return obj
- end
+ key, obj = result_map.detect{ |k, _| k.include?('SUCCESS') }
+ return obj if key
- key, obj = result_map.detect{|k, v| k.include?('FAILURE')}
- if key
- raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "#{obj}")
- end
-
- raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "Invalid UDF return value")
+ key, obj = result_map.detect{ |k, _| k.include?('FAILURE') }
+ message = key ? obj.to_s : "Invalid UDF return value"
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::UDF_BAD_RESPONSE, message)
end
# execute_udf_on_query applies user defined function on records that match the statement filter.
# Records are not returned to the client.
# This asynchronous server call will return before command is complete.
@@ -538,11 +524,11 @@
# If the policy is nil, the default relevant policy will be used.
def execute_udf_on_query(statement, package_name, function_name, function_args=[], options={})
policy = create_policy(options, QueryPolicy)
nodes = @cluster.nodes
- if nodes.length == 0
+ if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.")
end
# TODO: wait until all migrations are finished
statement.set_aggregate_function(package_name, function_name, function_args, false)
@@ -642,11 +628,11 @@
# Retry policy must be one-shot for scans.
# copy on write for policy
new_policy = policy.clone
nodes = @cluster.nodes
- if nodes.length == 0
+ if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
end
recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan)
@@ -697,11 +683,11 @@
# Retry policy must be one-shot for scans.
# copy on write for policy
new_policy = policy.clone
new_policy.max_retries = 0
- node = @cluster.get_node_by_name(node) if !node.is_a?(Aerospike::Node)
+ node = @cluster.get_node_by_name(node) unless node.is_a?(Aerospike::Node)
recordset = Recordset.new(policy.record_queue_size, 1, :scan)
Thread.new do
Thread.current.abort_on_exception = true
@@ -733,11 +719,11 @@
def query(statement, options={})
policy = create_policy(options, QueryPolicy)
new_policy = policy.clone
nodes = @cluster.nodes
- if nodes.length == 0
+ if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
end
recordset = Recordset.new(policy.record_queue_size, nodes.length, :query)
@@ -780,14 +766,12 @@
command.drop_user(@cluster, policy, user)
end
# Change user's password. Clear-text password will be hashed using bcrypt before sending to server.
def change_password(user, password, options={})
+ raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != ""
policy = create_policy(options, AdminPolicy)
- if @cluster.user == ''
- return NewAerospikeError(INVALID_USER)
- end
hash = AdminCommand.hash_password(password)
command = AdminCommand.new
if user == @cluster.user
@@ -830,11 +814,12 @@
end
private
def send_info_command(policy, command)
- @cluster.request_info(@default_policy, command)
+ policy ||= default_policy
+ @cluster.request_info(policy, command)
end
def hash_to_bins(hash)
if hash.is_a?(Bin)
[hash]
@@ -910,28 +895,28 @@
def execute_command(command)
validate_command(command)
command.execute
end
- def batch_execute(keys, &cmd_gen)
+ def batch_execute(keys)
batch_nodes = BatchNode.generate_list(@cluster, keys)
threads = []
# Use a thread per namespace per node
batch_nodes.each do |batch_node|
# copy to avoid race condition
bn = batch_node
bn.batch_namespaces.each do |bns|
threads << Thread.new do
Thread.current.abort_on_exception = true
- command = cmd_gen.call(bn.node, bns)
+ command = yield bn.node, bns
execute_command(command)
end
end
end
- threads.each { |thr| thr.join }
+ threads.each(&:join)
end
end # class
-end #module
+end # module