lib/aerospike/client.rb in aerospike-2.24.0 vs lib/aerospike/client.rb in aerospike-2.25.0
- old
+ new
@@ -13,12 +13,12 @@
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-require 'digest'
-require 'base64'
+require "digest"
+require "base64"
module Aerospike
##
# Client class manages the Aerospike cluster nodes under the hood, and
@@ -34,11 +34,10 @@
#
# #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and
# +:fail_if_not_connected+ set to true
class Client
-
attr_accessor :default_admin_policy
attr_accessor :default_batch_policy
attr_accessor :default_info_policy
attr_accessor :default_query_policy
attr_accessor :default_read_policy
@@ -46,12 +45,11 @@
attr_accessor :default_write_policy
attr_accessor :default_operate_policy
attr_accessor :cluster
def initialize(hosts = nil, policy: ClientPolicy.new, connect: true)
-
- hosts = ::Aerospike::Host::Parse.(hosts || ENV['AEROSPIKE_HOSTS'] || 'localhost')
+ hosts = ::Aerospike::Host::Parse.(hosts || ENV["AEROSPIKE_HOSTS"] || "localhost")
policy = create_policy(policy, ClientPolicy)
set_default_policies(policy.policies)
@cluster = Cluster.new(policy, hosts)
@cluster.add_cluster_config_change_listener(self)
@@ -247,11 +245,11 @@
# Servers >= 4.3.1.4 require lut argument
str_cmd << ";lut=now"
end
response = send_info_command(policy, str_cmd, node).upcase
- return if response == 'OK'
+ return if response == "OK"
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}")
end
#-------------------------------------------------------
# Touch Operations
@@ -384,11 +382,12 @@
# read the result, all in one database call. Operations are executed in
# the order they are specified.
def operate(key, operations, options = nil)
policy = create_policy(options, OperatePolicy, default_operate_policy)
- command = OperateCommand.new(@cluster, policy, key, operations)
+ args = OperateArgs.new(cluster, policy, default_write_policy, default_operate_policy, key, operations)
+ command = OperateCommand.new(@cluster, key, args)
execute_command(command)
command.record
end
#---------------------------------------------------------------
@@ -413,28 +412,28 @@
#
# This method is only supported by Aerospike 3 servers.
def register_udf(udf_body, server_path, language, options = nil)
policy = create_policy(options, Policy, default_info_policy)
- content = Base64.strict_encode64(udf_body).force_encoding('binary')
+ content = Base64.strict_encode64(udf_body).force_encoding("binary")
str_cmd = "udf-put:filename=#{server_path};content=#{content};"
str_cmd << "content-len=#{content.length};udf-type=#{language};"
# Send UDF to one node. That node will distribute the UDF to other nodes.
response_map = @cluster.request_info(policy, str_cmd)
res = {}
response_map.each do |k, response|
- vals = response.to_s.split(';')
+ vals = response.to_s.split(";")
vals.each do |pair|
k, v = pair.split("=", 2)
res[k] = v
end
end
- if res['error']
- raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}")
+ if res["error"]
+ raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res["error"]}\nFile: #{res["file"]}\nLine: #{res["line"]}\nMessage: #{res["message"]}")
end
UdfRegisterTask.new(@cluster, server_path)
end
@@ -452,11 +451,11 @@
# Send command to one node. That node will distribute it to other nodes.
# Send UDF to one node. That node will distribute the UDF to other nodes.
response_map = @cluster.request_info(policy, str_cmd)
_, response = response_map.first
- if response == 'ok'
+ if response == "ok"
UdfRemoveTask.new(@cluster, udf_name)
else
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, response)
end
end
@@ -464,31 +463,31 @@
# ListUDF lists all packages containing user defined functions in the server.
# This method is only supported by Aerospike 3 servers.
def list_udf(options = nil)
policy = create_policy(options, Policy, default_info_policy)
- str_cmd = 'udf-list'
+ str_cmd = "udf-list"
# Send command to one node. That node will distribute it to other nodes.
response_map = @cluster.request_info(policy, str_cmd)
_, response = response_map.first
- vals = response.split(';')
+ vals = response.split(";")
vals.map do |udf_info|
- next if udf_info.strip! == ''
+ next if udf_info.strip! == ""
- udf_parts = udf_info.split(',')
+ udf_parts = udf_info.split(",")
udf = UDF.new
udf_parts.each do |values|
- k, v = values.split('=', 2)
+ k, v = values.split("=", 2)
case k
- when 'filename'
+ when "filename"
udf.filename = v
- when 'hash'
+ when "hash"
udf.hash = v
- when 'type'
+ when "type"
udf.language = v
end
end
udf
end
@@ -499,11 +498,11 @@
# The package name is used to locate the udf file location:
#
# udf file = <server udf dir>/<package name>.lua
#
# This method is only supported by Aerospike 3 servers.
- def execute_udf(key, package_name, function_name, args=[], options = nil)
+ def execute_udf(key, package_name, function_name, args = [], options = nil)
policy = create_policy(options, WritePolicy, default_write_policy)
command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args)
execute_command(command)
@@ -512,14 +511,14 @@
return nil if !record || record.bins.empty?
result_map = record.bins
# User defined functions don't have to return a value.
- key, obj = result_map.detect{ |k, _| k.include?('SUCCESS') }
+ key, obj = result_map.detect { |k, _| k.include?("SUCCESS") }
return obj if key
- key, obj = result_map.detect{ |k, _| k.include?('FAILURE') }
+ key, obj = result_map.detect { |k, _| k.include?("FAILURE") }
message = key ? obj.to_s : "Invalid UDF return value"
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::UDF_BAD_RESPONSE, message)
end
# execute_udf_on_query applies user defined function on records that match the statement filter.
@@ -528,11 +527,11 @@
# The user can optionally wait for command completion by using the returned
# ExecuteTask instance.
#
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, the default relevant policy will be used.
- def execute_udf_on_query(statement, package_name, function_name, function_args=[], options = nil)
+ def execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
nodes = @cluster.nodes
if nodes.empty?
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.")
@@ -557,11 +556,10 @@
end
ExecuteTask.new(@cluster, statement)
end
-
# Create secondary index.
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
# IndexTask instance.
#
@@ -581,16 +579,16 @@
str_cmd << ";indexdata=#{bin_name},#{index_type.to_s.upcase}"
str_cmd << ";priority=normal"
# Send index command to one node. That node will distribute the command to other nodes.
response = send_info_command(policy, str_cmd).upcase
- if response == 'OK'
+ if response == "OK"
# Return task that could optionally be polled for completion.
return IndexTask.new(@cluster, namespace, index_name)
end
- if response.start_with?('FAIL:200')
+ if response.start_with?("FAIL:200")
# Index has already been created. Do not need to poll for completion.
return IndexTask.new(@cluster, namespace, index_name, true)
end
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Create index failed: #{response}")
@@ -605,14 +603,14 @@
str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
str_cmd << ";indexname=#{index_name}"
# Send index command to one node. That node will distribute the command to other nodes.
response = send_info_command(policy, str_cmd).upcase
- return if response == 'OK'
+ return if response == "OK"
# Index did not previously exist. Return without error.
- return if response.start_with?('FAIL:201')
+ return if response.start_with?("FAIL:201")
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Drop index failed: #{response}")
end
def request_info(*commands, policy: nil)
@@ -964,9 +962,7 @@
end
end
threads.each(&:join)
end
-
end # class
-
end # module