lib/avro/io.rb in avro-1.8.2 vs lib/avro/io.rb in avro-1.9.0

- old
+ new

@@ -3,13 +3,13 @@ # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. @@ -41,13 +41,13 @@ def initialize(reader) @reader = reader end def byte! - @reader.read(1).unpack('C').first + @reader.readbyte end - + def read_null # null is written as zero byte's nil end @@ -74,19 +74,19 @@ def read_float # A float is written as 4 bytes. # The float is converted into a 32-bit integer using a method # equivalent to Java's floatToIntBits and then encoded in # little-endian format. - @reader.read(4).unpack('e')[0] + read_and_unpack(4, 'e'.freeze) end def read_double # A double is written as 8 bytes. # The double is converted into a 64-bit integer using a method # equivalent to Java's doubleToLongBits and then encoded in # little-endian format. - @reader.read(8).unpack('E')[0] + read_and_unpack(8, 'E'.freeze) end def read_bytes # Bytes are encoded as a long followed by that many bytes of # data. @@ -95,11 +95,11 @@ def read_string # A string is encoded as a long followed by that many bytes of # UTF-8 encoded character data. read_bytes.tap do |string| - string.force_encoding("UTF-8") if string.respond_to? :force_encoding + string.force_encoding('UTF-8'.freeze) if string.respond_to? :force_encoding end end def read(len) # Read n bytes @@ -142,10 +142,27 @@ end def skip(n) reader.seek(reader.tell() + n) end + + private + + # Optimize unpacking strings when `unpack1` is available (ruby >= 2.4) + if String.instance_methods.include?(:unpack1) + + def read_and_unpack(byte_count, format) + @reader.read(byte_count).unpack1(format) + end + + else + + def read_and_unpack(byte_count, format) + @reader.read(byte_count).unpack(format)[0] + end + + end end # Write leaf values class BinaryEncoder attr_reader :writer @@ -157,11 +174,11 @@ # null is written as zero bytes def write_null(datum) nil end - # a boolean is written as a single byte + # a boolean is written as a single byte # whose value is either 0 (false) or 1 (true). def write_boolean(datum) on_disk = datum ? 1.chr : 0.chr writer.write(on_disk) end @@ -173,11 +190,10 @@ end # int and long values are written using variable-length, # zig-zag coding. def write_long(n) - foo = n n = (n << 1) ^ (n >> 63) while (n & ~0x7F) != 0 @writer.write(((n & 0x7f) | 0x80).chr) n >>= 7 end @@ -187,19 +203,19 @@ # A float is written as 4 bytes. # The float is converted into a 32-bit integer using a method # equivalent to Java's floatToIntBits and then encoded in # little-endian format. def write_float(datum) - @writer.write([datum].pack('e')) + @writer.write([datum].pack('e'.freeze)) end # A double is written as 8 bytes. # The double is converted into a 64-bit integer using a method # equivalent to Java's doubleToLongBits and then encoded in # little-endian format. def write_double(datum) - @writer.write([datum].pack('E')) + @writer.write([datum].pack('E'.freeze)) end # Bytes are encoded as a long followed by that many bytes of data. def write_bytes(datum) write_long(datum.bytesize) @@ -207,11 +223,11 @@ end # A string is encoded as a long followed by that many bytes of # UTF-8 encoded character data def write_string(datum) - datum = datum.encode('utf-8') if datum.respond_to? :encode + datum = datum.encode('utf-8'.freeze) if datum.respond_to? :encode write_bytes(datum) end # Write an arbritary datum. def write(datum) @@ -219,50 +235,11 @@ end end class DatumReader def self.match_schemas(writers_schema, readers_schema) - w_type = writers_schema.type_sym - r_type = readers_schema.type_sym - - # This conditional is begging for some OO love. - if w_type == :union || r_type == :union - return true - end - - if w_type == r_type - return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) - - case r_type - when :record - return writers_schema.fullname == readers_schema.fullname - when :error - return writers_schema.fullname == readers_schema.fullname - when :request - return true - when :fixed - return writers_schema.fullname == readers_schema.fullname && - writers_schema.size == readers_schema.size - when :enum - return writers_schema.fullname == readers_schema.fullname - when :map - return writers_schema.values.type == readers_schema.values.type - when :array - return writers_schema.items.type == readers_schema.items.type - end - end - - # Handle schema promotion - if w_type == :int && [:long, :float, :double].include?(r_type) - return true - elsif w_type == :long && [:float, :double].include?(r_type) - return true - elsif w_type == :float && r_type == :double - return true - end - - return false + Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) end attr_accessor :writers_schema, :readers_schema def initialize(writers_schema=nil, readers_schema=nil) @@ -291,11 +268,11 @@ raise SchemaMatchException.new(writers_schema, readers_schema) end # function dispatch for reading data based on type of writer's # schema - case writers_schema.type_sym + datum = case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string when :int; decoder.read_int when :long; decoder.read_long @@ -309,10 +286,12 @@ when :union; read_union(writers_schema, readers_schema, decoder) when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) else raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" end + + readers_schema.type_adapter.decode(datum) end def read_fixed(writers_schema, readers_schema, decoder) decoder.read(writers_schema.size) end @@ -334,11 +313,11 @@ read_items = [] block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count - block_size = decoder.read_long + _block_size = decoder.read_long end block_count.times do read_items << read_data(writers_schema.items, readers_schema.items, decoder) @@ -353,11 +332,11 @@ read_items = {} block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count - block_size = decoder.read_long + _block_size = decoder.read_long end block_count.times do key = decoder.read_string read_items[key] = read_data(writers_schema.values, readers_schema.values, @@ -391,28 +370,24 @@ # fill in the default values if readers_fields_hash.size > read_record.size writers_fields_hash = writers_schema.fields_hash readers_fields_hash.each do |field_name, field| unless writers_fields_hash.has_key? field_name - if !field.default.nil? + if field.default? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else - # FIXME(jmhodges) another 'unset' here + raise AvroError, "Missing data for #{field.type} with no default" end end end end read_record end def read_default_value(field_schema, default_value) - if default_value == :no_default - raise AvroError, "Missing data for #{field_schema} with no default" - end - # Basically a JSON Decoder? case field_schema.type_sym when :null return nil when :boolean @@ -522,11 +497,11 @@ block_count = decoder.read_long while block_count != 0 if block_count < 0 decoder.skip(decoder.read_long) else - block_count.times &blk + block_count.times(&blk) end block_count = decoder.read_long end end end # DatumReader @@ -540,12 +515,14 @@ def write(datum, encoder) write_data(writers_schema, datum, encoder) end - def write_data(writers_schema, datum, encoder) - unless Schema.validate(writers_schema, datum) + def write_data(writers_schema, logical_datum, encoder) + datum = writers_schema.type_adapter.encode(logical_datum) + + unless Schema.validate(writers_schema, datum, { recursive: false, encoded: true }) raise AvroTypeError.new(writers_schema, datum) end # function dispatch to write datum case writers_schema.type_sym @@ -576,20 +553,22 @@ index_of_datum = writers_schema.symbols.index(datum) encoder.write_int(index_of_datum) end def write_array(writers_schema, datum, encoder) + raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Array) if datum.size > 0 encoder.write_long(datum.size) datum.each do |item| write_data(writers_schema.items, item, encoder) end end encoder.write_long(0) end def write_map(writers_schema, datum, encoder) + raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Hash) if datum.size > 0 encoder.write_long(datum.size) datum.each do |k,v| encoder.write_string(k) write_data(writers_schema.values, v, encoder) @@ -608,9 +587,10 @@ encoder.write_long(index_of_schema) write_data(writers_schema.schemas[index_of_schema], datum, encoder) end def write_record(writers_schema, datum, encoder) + raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Hash) writers_schema.fields.each do |field| write_data(field.type, datum[field.name], encoder) end end end # DatumWriter