lib/aerospike/command/batch_command.rb in aerospike-0.1.3 vs lib/aerospike/command/batch_command.rb in aerospike-0.1.5
- old
+ new
@@ -29,11 +29,10 @@
def initialize(node)
super(node)
@valid = true
@mutex = Mutex.new
- @records = Queue.new
self
end
def parse_result
@@ -77,14 +76,83 @@
when Aerospike::FieldType::NAMESPACE
namespace = @data_buffer.read(1, size).force_encoding('utf-8')
when Aerospike::FieldType::TABLE
set_name = @data_buffer.read(1, size).force_encoding('utf-8')
when Aerospike::FieldType::KEY
- user_key = Value.bytes_to_key_value(@data_buffer.read(1).ord, @data_buffer, 2, size-1)
+ user_key = Aerospike::bytes_to_key_value(@data_buffer.read(1).ord, @data_buffer, 2, size-1)
end
end
Aerospike::Key.new(namespace, set_name, user_key, digest)
+ end
+
+ # Parses the given byte buffer and populate the result object.
+ # Returns the number of bytes that were parsed from the given buffer.
+ def parse_record(key, op_count, generation, expiration)
+ bins = nil
+ duplicates = nil
+
+ for i in 0...op_count
+ raise Aerospike::Exceptions::QueryTerminated.new unless valid?
+
+ read_bytes(8)
+
+ op_size = @data_buffer.read_int32(0).ord
+ particle_type = @data_buffer.read(5).ord
+ version = @data_buffer.read(6).ord
+ name_size = @data_buffer.read(7).ord
+
+ read_bytes(name_size)
+ name = @data_buffer.read(0, name_size).force_encoding('utf-8')
+
+ particle_bytes_size = op_size - (4 + name_size)
+ read_bytes(particle_bytes_size)
+ value = Aerospike.bytes_to_particle(particle_type, @data_buffer, 0, particle_bytes_size)
+
+ # Currently, the batch command returns all the bins even if a subset of
+ # the bins are requested. We have to filter it on the client side.
+ # TODO: Filter batch bins on server!
+ # if !@bin_names || @bin_names.any?{|bn| bn == name}
+ # if !@bin_names || (@bin_names == []) || @bin_names.any?{|bn| bn == name}
+ if !@bin_names || (@bin_names.empty?) || @bin_names.any?{|bn| bn == name}
+
+ vmap = nil
+
+ if version > 0 || duplicates
+ unless duplicates
+ duplicates = []
+ duplicates << bins
+ bins = nil
+
+ for j in 0...version
+ duplicates << nil
+ end
+ else
+ for j in duplicates.length..version
+ duplicates << nil
+ end
+ end
+
+ vmap = duplicates[version]
+ unless vmap
+ vmap = {}
+ duplicates[version] = vmap
+ end
+ else
+ unless bins
+ bins = {}
+ end
+ vmap = bins
+ end
+ vmap[name] = value
+ end
+ end
+
+ # Remove nil duplicates just in case there were holes in the version number space.
+ # TODO: this seems to be a bad idea; O(n) algorithm after another O(n) algorithm
+ duplicates.compact! if duplicates
+
+ Record.new(@node, key, bins, duplicates, generation, expiration)
end
def read_bytes(length)
if length > @data_buffer.length
# Corrupted data streams can result in a huge length.