lib/aerospike/client.rb in aerospike-0.1.3 vs lib/aerospike/client.rb in aerospike-0.1.5

- old
+ new

@@ -39,10 +39,12 @@ attr_accessor :default_policy, :default_write_policy def initialize(host, port, options={}) @default_policy = Policy.new @default_write_policy = WritePolicy.new + @default_scan_policy = ScanPolicy.new + @default_query_policy = QueryPolicy.new policy = opt_to_client_policy(options) @cluster = Cluster.new(policy, Host.new(host, port)) @@ -400,17 +402,18 @@ str_cmd = "udf-put:filename=#{server_path};content=#{content};" str_cmd << "content-len=#{content.length};udf-type=#{language};" # Send UDF to one node. That node will distribute the UDF to other nodes. response_map = @cluster.request_info(@default_policy, str_cmd) - response, _ = response_map.first res = {} - vals = response.split(';') - vals.each do |pair| - k, v = pair.split("=", 2) - res[k] = v + response_map.each do |k, response| + vals = response.to_s.split(';') + vals.each do |pair| + k, v = pair.split("=", 2) + res[k] = v + end end if res['error'] raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}") end @@ -498,11 +501,11 @@ key, obj = result_map.detect{|k, v| k.include?('FAILURE')} if key raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "#{obj}") end - raise Aerospike::Exception::Aerospike.new(UDF_BAD_RESPONSE, "Invalid UDF return value") + raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "Invalid UDF return value") end # Create secondary index. # This asynchronous server call will return before command is complete. # The user can optionally wait for command completion by using the returned @@ -558,10 +561,145 @@ def request_info(*commands) @cluster.request_info(@default_policy, *commands) end + #------------------------------------------------------- + # Scan Operations + #------------------------------------------------------- + + def scan_all(namespace, set_name, bin_names=[], options={}) + policy = opt_to_scan_policy(options) + + # wait until all migrations are finished + # TODO: implement + # @cluster.WaitUntillMigrationIsFinished(policy.timeout) + + # Retry policy must be one-shot for scans. + # copy on write for policy + new_policy = policy.clone + + nodes = @cluster.nodes + if nodes.length == 0 + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") + end + + recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan) + + if policy.concurrent_nodes + # Use a thread per node + nodes.each do |node| + Thread.new do + abort_on_exception = true + command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) + begin + command.execute + rescue => e + Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION + recordset.cancel(e) + ensure + recordset.thread_finished + end + end + end + else + Thread.new do + abort_on_exception = true + nodes.each do |node| + command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) + begin + command.execute + rescue => e + Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION + recordset.cancel(e) + ensure + recordset.thread_finished + end + end + end + end + + recordset + end + + # ScanNode reads all records in specified namespace and set, from one node only. + # The policy can be used to specify timeouts. + def scan_node(node, namespace, set_name, bin_names=[], options={}) + policy = opt_to_scan_policy(options) + # wait until all migrations are finished + # TODO: implement + # @cluster.WaitUntillMigrationIsFinished(policy.timeout) + + # Retry policy must be one-shot for scans. + # copy on write for policy + new_policy = policy.clone + new_policy.max_retries = 0 + + node = @cluster.get_node_by_name(node) if !node.is_a?(Aerospike::Node) + + recordset = Recordset.new(policy.record_queue_size, 1, :scan) + + Thread.new do + abort_on_exception = true + command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset) + begin + command.execute + rescue => e + Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION + recordset.cancel(e) + ensure + recordset.thread_finished + end + end + + recordset + end + + #-------------------------------------------------------- + # Query functions (Supported by Aerospike 3 servers only) + #-------------------------------------------------------- + + # Query executes a query and returns a recordset. + # The query executor puts records on a channel from separate goroutines. + # The caller can concurrently pops records off the channel through the + # record channel. + # + # This method is only supported by Aerospike 3 servers. + # If the policy is nil, a default policy will be generated. + def query(statement, options={}) + policy = opt_to_query_policy(options) + new_policy = policy.clone + + # Always set a taskId + statement.task_id = Time.now.to_i if statement.task_id == 0 + + nodes = @cluster.nodes + if nodes.length == 0 + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") + end + + recordset = Recordset.new(policy.record_queue_size, nodes.length, :query) + + # Use a thread per node + nodes.each do |node| + Thread.new do + abort_on_exception = true + command = QueryCommand.new(node, new_policy, statement, recordset) + begin + command.execute + rescue => e + Aerospike.logger.error(e) unless e == Rescordset::QUERY_TERMINATED_EXCEPTION + recordset.cancel(e) + ensure + recordset.thread_finished + end + end + end + + recordset + end + private def send_info_command(policy, command) @cluster.request_info(@default_policy, command) end @@ -578,11 +716,11 @@ end end end def opt_to_client_policy(options) - if options == {} || options.nil? + if options.nil? || options == {} ClientPolicy.new elsif options.is_a?(ClientPolicy) options elsif options.is_a?(Hash) ClientPolicy.new( @@ -592,11 +730,11 @@ ) end end def opt_to_policy(options) - if options == {} || options.nil? + if options.nil? || options == {} @default_policy elsif options.is_a?(Policy) options elsif options.is_a?(Hash) Policy.new( @@ -607,11 +745,11 @@ ) end end def opt_to_write_policy(options) - if options == {} || options.nil? + if options.nil? || options == {} @default_write_policy elsif options.is_a?(WritePolicy) options elsif options.is_a?(Hash) WritePolicy.new( @@ -619,9 +757,34 @@ options[:gen_policy], options[:generation], options[:expiration], options[:send_key] ) + end + end + + def opt_to_scan_policy(options) + if options.nil? || options == {} + @default_scan_policy + elsif options.is_a?(ScanPolicy) + options + elsif options.is_a?(Hash) + ScanPolicy.new( + options[:scan_percent], + options[:concurrent_nodes], + options[:include_bin_data], + options[:fail_on_cluster_change] + ) + end + end + + def opt_to_query_policy(options) + if options.nil? || options == {} + @default_query_policy + elsif options.is_a?(QueryPolicy) + options + elsif options.is_a?(Hash) + QueryPolicy.new() end end def batch_execute(keys, &cmd_gen) batch_nodes = BatchNode.generate_list(@cluster, keys)