lib/aerospike/command/batch_command.rb in aerospike-0.1.3 vs lib/aerospike/command/batch_command.rb in aerospike-0.1.5

- old
+ new

@@ -29,11 +29,10 @@ def initialize(node) super(node) @valid = true @mutex = Mutex.new - @records = Queue.new self end def parse_result @@ -77,14 +76,83 @@ when Aerospike::FieldType::NAMESPACE namespace = @data_buffer.read(1, size).force_encoding('utf-8') when Aerospike::FieldType::TABLE set_name = @data_buffer.read(1, size).force_encoding('utf-8') when Aerospike::FieldType::KEY - user_key = Value.bytes_to_key_value(@data_buffer.read(1).ord, @data_buffer, 2, size-1) + user_key = Aerospike::bytes_to_key_value(@data_buffer.read(1).ord, @data_buffer, 2, size-1) end end Aerospike::Key.new(namespace, set_name, user_key, digest) + end + + # Parses the given byte buffer and populate the result object. + # Returns the number of bytes that were parsed from the given buffer. + def parse_record(key, op_count, generation, expiration) + bins = nil + duplicates = nil + + for i in 0...op_count + raise Aerospike::Exceptions::QueryTerminated.new unless valid? + + read_bytes(8) + + op_size = @data_buffer.read_int32(0).ord + particle_type = @data_buffer.read(5).ord + version = @data_buffer.read(6).ord + name_size = @data_buffer.read(7).ord + + read_bytes(name_size) + name = @data_buffer.read(0, name_size).force_encoding('utf-8') + + particle_bytes_size = op_size - (4 + name_size) + read_bytes(particle_bytes_size) + value = Aerospike.bytes_to_particle(particle_type, @data_buffer, 0, particle_bytes_size) + + # Currently, the batch command returns all the bins even if a subset of + # the bins are requested. We have to filter it on the client side. + # TODO: Filter batch bins on server! + # if !@bin_names || @bin_names.any?{|bn| bn == name} + # if !@bin_names || (@bin_names == []) || @bin_names.any?{|bn| bn == name} + if !@bin_names || (@bin_names.empty?) || @bin_names.any?{|bn| bn == name} + + vmap = nil + + if version > 0 || duplicates + unless duplicates + duplicates = [] + duplicates << bins + bins = nil + + for j in 0...version + duplicates << nil + end + else + for j in duplicates.length..version + duplicates << nil + end + end + + vmap = duplicates[version] + unless vmap + vmap = {} + duplicates[version] = vmap + end + else + unless bins + bins = {} + end + vmap = bins + end + vmap[name] = value + end + end + + # Remove nil duplicates just in case there were holes in the version number space. + # TODO: this seems to be a bad idea; O(n) algorithm after another O(n) algorithm + duplicates.compact! if duplicates + + Record.new(@node, key, bins, duplicates, generation, expiration) end def read_bytes(length) if length > @data_buffer.length # Corrupted data streams can result in a huge length.