lib/aerospike/client.rb in aerospike-0.1.3 vs lib/aerospike/client.rb in aerospike-0.1.5
- old
+ new
@@ -39,10 +39,12 @@
attr_accessor :default_policy, :default_write_policy
def initialize(host, port, options={})
@default_policy = Policy.new
@default_write_policy = WritePolicy.new
+ @default_scan_policy = ScanPolicy.new
+ @default_query_policy = QueryPolicy.new
policy = opt_to_client_policy(options)
@cluster = Cluster.new(policy, Host.new(host, port))
@@ -400,17 +402,18 @@
str_cmd = "udf-put:filename=#{server_path};content=#{content};"
str_cmd << "content-len=#{content.length};udf-type=#{language};"
# Send UDF to one node. That node will distribute the UDF to other nodes.
response_map = @cluster.request_info(@default_policy, str_cmd)
- response, _ = response_map.first
res = {}
- vals = response.split(';')
- vals.each do |pair|
- k, v = pair.split("=", 2)
- res[k] = v
+ response_map.each do |k, response|
+ vals = response.to_s.split(';')
+ vals.each do |pair|
+ k, v = pair.split("=", 2)
+ res[k] = v
+ end
end
if res['error']
raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}")
end
@@ -498,11 +501,11 @@
key, obj = result_map.detect{|k, v| k.include?('FAILURE')}
if key
raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "#{obj}")
end
- raise Aerospike::Exception::Aerospike.new(UDF_BAD_RESPONSE, "Invalid UDF return value")
+ raise Aerospike::Exceptions::Aerospike.new(UDF_BAD_RESPONSE, "Invalid UDF return value")
end
# Create secondary index.
# This asynchronous server call will return before command is complete.
# The user can optionally wait for command completion by using the returned
@@ -558,10 +561,145 @@
def request_info(*commands)
@cluster.request_info(@default_policy, *commands)
end
+ #-------------------------------------------------------
+ # Scan Operations
+ #-------------------------------------------------------
+
+ def scan_all(namespace, set_name, bin_names=[], options={})
+ policy = opt_to_scan_policy(options)
+
+ # wait until all migrations are finished
+ # TODO: implement
+ # @cluster.WaitUntillMigrationIsFinished(policy.timeout)
+
+ # Retry policy must be one-shot for scans.
+ # copy on write for policy
+ new_policy = policy.clone
+
+ nodes = @cluster.nodes
+ if nodes.length == 0
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
+ end
+
+ recordset = Recordset.new(policy.record_queue_size, nodes.length, :scan)
+
+ if policy.concurrent_nodes
+ # Use a thread per node
+ nodes.each do |node|
+ Thread.new do
+ abort_on_exception = true
+ command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
+ begin
+ command.execute
+ rescue => e
+ Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION
+ recordset.cancel(e)
+ ensure
+ recordset.thread_finished
+ end
+ end
+ end
+ else
+ Thread.new do
+ abort_on_exception = true
+ nodes.each do |node|
+ command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
+ begin
+ command.execute
+ rescue => e
+ Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION
+ recordset.cancel(e)
+ ensure
+ recordset.thread_finished
+ end
+ end
+ end
+ end
+
+ recordset
+ end
+
+ # ScanNode reads all records in specified namespace and set, from one node only.
+ # The policy can be used to specify timeouts.
+ def scan_node(node, namespace, set_name, bin_names=[], options={})
+ policy = opt_to_scan_policy(options)
+ # wait until all migrations are finished
+ # TODO: implement
+ # @cluster.WaitUntillMigrationIsFinished(policy.timeout)
+
+ # Retry policy must be one-shot for scans.
+ # copy on write for policy
+ new_policy = policy.clone
+ new_policy.max_retries = 0
+
+ node = @cluster.get_node_by_name(node) if !node.is_a?(Aerospike::Node)
+
+ recordset = Recordset.new(policy.record_queue_size, 1, :scan)
+
+ Thread.new do
+ abort_on_exception = true
+ command = ScanCommand.new(node, new_policy, namespace, set_name, bin_names, recordset)
+ begin
+ command.execute
+ rescue => e
+ Aerospike.logger.error(e) unless e == Rescordset::SCAN_TERMINATED_EXCEPTION
+ recordset.cancel(e)
+ ensure
+ recordset.thread_finished
+ end
+ end
+
+ recordset
+ end
+
+ #--------------------------------------------------------
+ # Query functions (Supported by Aerospike 3 servers only)
+ #--------------------------------------------------------
+
+ # Query executes a query and returns a recordset.
+ # The query executor puts records on a channel from separate goroutines.
+ # The caller can concurrently pops records off the channel through the
+ # record channel.
+ #
+ # This method is only supported by Aerospike 3 servers.
+ # If the policy is nil, a default policy will be generated.
+ def query(statement, options={})
+ policy = opt_to_query_policy(options)
+ new_policy = policy.clone
+
+ # Always set a taskId
+ statement.task_id = Time.now.to_i if statement.task_id == 0
+
+ nodes = @cluster.nodes
+ if nodes.length == 0
+ raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
+ end
+
+ recordset = Recordset.new(policy.record_queue_size, nodes.length, :query)
+
+ # Use a thread per node
+ nodes.each do |node|
+ Thread.new do
+ abort_on_exception = true
+ command = QueryCommand.new(node, new_policy, statement, recordset)
+ begin
+ command.execute
+ rescue => e
+ Aerospike.logger.error(e) unless e == Rescordset::QUERY_TERMINATED_EXCEPTION
+ recordset.cancel(e)
+ ensure
+ recordset.thread_finished
+ end
+ end
+ end
+
+ recordset
+ end
+
private
def send_info_command(policy, command)
@cluster.request_info(@default_policy, command)
end
@@ -578,11 +716,11 @@
end
end
end
def opt_to_client_policy(options)
- if options == {} || options.nil?
+ if options.nil? || options == {}
ClientPolicy.new
elsif options.is_a?(ClientPolicy)
options
elsif options.is_a?(Hash)
ClientPolicy.new(
@@ -592,11 +730,11 @@
)
end
end
def opt_to_policy(options)
- if options == {} || options.nil?
+ if options.nil? || options == {}
@default_policy
elsif options.is_a?(Policy)
options
elsif options.is_a?(Hash)
Policy.new(
@@ -607,11 +745,11 @@
)
end
end
def opt_to_write_policy(options)
- if options == {} || options.nil?
+ if options.nil? || options == {}
@default_write_policy
elsif options.is_a?(WritePolicy)
options
elsif options.is_a?(Hash)
WritePolicy.new(
@@ -619,9 +757,34 @@
options[:gen_policy],
options[:generation],
options[:expiration],
options[:send_key]
)
+ end
+ end
+
+ def opt_to_scan_policy(options)
+ if options.nil? || options == {}
+ @default_scan_policy
+ elsif options.is_a?(ScanPolicy)
+ options
+ elsif options.is_a?(Hash)
+ ScanPolicy.new(
+ options[:scan_percent],
+ options[:concurrent_nodes],
+ options[:include_bin_data],
+ options[:fail_on_cluster_change]
+ )
+ end
+ end
+
+ def opt_to_query_policy(options)
+ if options.nil? || options == {}
+ @default_query_policy
+ elsif options.is_a?(QueryPolicy)
+ options
+ elsif options.is_a?(Hash)
+ QueryPolicy.new()
end
end
def batch_execute(keys, &cmd_gen)
batch_nodes = BatchNode.generate_list(@cluster, keys)