lib/aerospike/client.rb in aerospike-2.24.0 vs lib/aerospike/client.rb in aerospike-2.25.0

- old
+ new

@@ -13,12 +13,12 @@ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. -require 'digest' -require 'base64' +require "digest" +require "base64" module Aerospike ## # Client class manages the Aerospike cluster nodes under the hood, and @@ -34,11 +34,10 @@ # # #=> raises Aerospike::Exceptions::Timeout if a +:timeout+ is specified and # +:fail_if_not_connected+ set to true class Client - attr_accessor :default_admin_policy attr_accessor :default_batch_policy attr_accessor :default_info_policy attr_accessor :default_query_policy attr_accessor :default_read_policy @@ -46,12 +45,11 @@ attr_accessor :default_write_policy attr_accessor :default_operate_policy attr_accessor :cluster def initialize(hosts = nil, policy: ClientPolicy.new, connect: true) - - hosts = ::Aerospike::Host::Parse.(hosts || ENV['AEROSPIKE_HOSTS'] || 'localhost') + hosts = ::Aerospike::Host::Parse.(hosts || ENV["AEROSPIKE_HOSTS"] || "localhost") policy = create_policy(policy, ClientPolicy) set_default_policies(policy.policies) @cluster = Cluster.new(policy, hosts) @cluster.add_cluster_config_change_listener(self) @@ -247,11 +245,11 @@ # Servers >= 4.3.1.4 require lut argument str_cmd << ";lut=now" end response = send_info_command(policy, str_cmd, node).upcase - return if response == 'OK' + return if response == "OK" raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}") end #------------------------------------------------------- # Touch Operations @@ -384,11 +382,12 @@ # read the result, all in one database call. Operations are executed in # the order they are specified. def operate(key, operations, options = nil) policy = create_policy(options, OperatePolicy, default_operate_policy) - command = OperateCommand.new(@cluster, policy, key, operations) + args = OperateArgs.new(cluster, policy, default_write_policy, default_operate_policy, key, operations) + command = OperateCommand.new(@cluster, key, args) execute_command(command) command.record end #--------------------------------------------------------------- @@ -413,28 +412,28 @@ # # This method is only supported by Aerospike 3 servers. def register_udf(udf_body, server_path, language, options = nil) policy = create_policy(options, Policy, default_info_policy) - content = Base64.strict_encode64(udf_body).force_encoding('binary') + content = Base64.strict_encode64(udf_body).force_encoding("binary") str_cmd = "udf-put:filename=#{server_path};content=#{content};" str_cmd << "content-len=#{content.length};udf-type=#{language};" # Send UDF to one node. That node will distribute the UDF to other nodes. response_map = @cluster.request_info(policy, str_cmd) res = {} response_map.each do |k, response| - vals = response.to_s.split(';') + vals = response.to_s.split(";") vals.each do |pair| k, v = pair.split("=", 2) res[k] = v end end - if res['error'] - raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}") + if res["error"] + raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res["error"]}\nFile: #{res["file"]}\nLine: #{res["line"]}\nMessage: #{res["message"]}") end UdfRegisterTask.new(@cluster, server_path) end @@ -452,11 +451,11 @@ # Send command to one node. That node will distribute it to other nodes. # Send UDF to one node. That node will distribute the UDF to other nodes. response_map = @cluster.request_info(policy, str_cmd) _, response = response_map.first - if response == 'ok' + if response == "ok" UdfRemoveTask.new(@cluster, udf_name) else raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, response) end end @@ -464,31 +463,31 @@ # ListUDF lists all packages containing user defined functions in the server. # This method is only supported by Aerospike 3 servers. def list_udf(options = nil) policy = create_policy(options, Policy, default_info_policy) - str_cmd = 'udf-list' + str_cmd = "udf-list" # Send command to one node. That node will distribute it to other nodes. response_map = @cluster.request_info(policy, str_cmd) _, response = response_map.first - vals = response.split(';') + vals = response.split(";") vals.map do |udf_info| - next if udf_info.strip! == '' + next if udf_info.strip! == "" - udf_parts = udf_info.split(',') + udf_parts = udf_info.split(",") udf = UDF.new udf_parts.each do |values| - k, v = values.split('=', 2) + k, v = values.split("=", 2) case k - when 'filename' + when "filename" udf.filename = v - when 'hash' + when "hash" udf.hash = v - when 'type' + when "type" udf.language = v end end udf end @@ -499,11 +498,11 @@ # The package name is used to locate the udf file location: # # udf file = <server udf dir>/<package name>.lua # # This method is only supported by Aerospike 3 servers. - def execute_udf(key, package_name, function_name, args=[], options = nil) + def execute_udf(key, package_name, function_name, args = [], options = nil) policy = create_policy(options, WritePolicy, default_write_policy) command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args) execute_command(command) @@ -512,14 +511,14 @@ return nil if !record || record.bins.empty? result_map = record.bins # User defined functions don't have to return a value. - key, obj = result_map.detect{ |k, _| k.include?('SUCCESS') } + key, obj = result_map.detect { |k, _| k.include?("SUCCESS") } return obj if key - key, obj = result_map.detect{ |k, _| k.include?('FAILURE') } + key, obj = result_map.detect { |k, _| k.include?("FAILURE") } message = key ? obj.to_s : "Invalid UDF return value" raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::UDF_BAD_RESPONSE, message) end # execute_udf_on_query applies user defined function on records that match the statement filter. @@ -528,11 +527,11 @@ # The user can optionally wait for command completion by using the returned # ExecuteTask instance. # # This method is only supported by Aerospike 3 servers. # If the policy is nil, the default relevant policy will be used. - def execute_udf_on_query(statement, package_name, function_name, function_args=[], options = nil) + def execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil) policy = create_policy(options, QueryPolicy, default_query_policy) nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.") @@ -557,11 +556,10 @@ end ExecuteTask.new(@cluster, statement) end - # Create secondary index. # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned # IndexTask instance. # @@ -581,16 +579,16 @@ str_cmd << ";indexdata=#{bin_name},#{index_type.to_s.upcase}" str_cmd << ";priority=normal" # Send index command to one node. That node will distribute the command to other nodes. response = send_info_command(policy, str_cmd).upcase - if response == 'OK' + if response == "OK" # Return task that could optionally be polled for completion. return IndexTask.new(@cluster, namespace, index_name) end - if response.start_with?('FAIL:200') + if response.start_with?("FAIL:200") # Index has already been created. Do not need to poll for completion. return IndexTask.new(@cluster, namespace, index_name, true) end raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Create index failed: #{response}") @@ -605,14 +603,14 @@ str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name}" # Send index command to one node. That node will distribute the command to other nodes. response = send_info_command(policy, str_cmd).upcase - return if response == 'OK' + return if response == "OK" # Index did not previously exist. Return without error. - return if response.start_with?('FAIL:201') + return if response.start_with?("FAIL:201") raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Drop index failed: #{response}") end def request_info(*commands, policy: nil) @@ -964,9 +962,7 @@ end end threads.each(&:join) end - end # class - end # module