lib/avro/io.rb in avro-1.8.2 vs lib/avro/io.rb in avro-1.9.0
- old
+ new
@@ -3,13 +3,13 @@
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -41,13 +41,13 @@
def initialize(reader)
@reader = reader
end
def byte!
- @reader.read(1).unpack('C').first
+ @reader.readbyte
end
-
+
def read_null
# null is written as zero byte's
nil
end
@@ -74,19 +74,19 @@
def read_float
# A float is written as 4 bytes.
# The float is converted into a 32-bit integer using a method
# equivalent to Java's floatToIntBits and then encoded in
# little-endian format.
- @reader.read(4).unpack('e')[0]
+ read_and_unpack(4, 'e'.freeze)
end
def read_double
# A double is written as 8 bytes.
# The double is converted into a 64-bit integer using a method
# equivalent to Java's doubleToLongBits and then encoded in
# little-endian format.
- @reader.read(8).unpack('E')[0]
+ read_and_unpack(8, 'E'.freeze)
end
def read_bytes
# Bytes are encoded as a long followed by that many bytes of
# data.
@@ -95,11 +95,11 @@
def read_string
# A string is encoded as a long followed by that many bytes of
# UTF-8 encoded character data.
read_bytes.tap do |string|
- string.force_encoding("UTF-8") if string.respond_to? :force_encoding
+ string.force_encoding('UTF-8'.freeze) if string.respond_to? :force_encoding
end
end
def read(len)
# Read n bytes
@@ -142,10 +142,27 @@
end
def skip(n)
reader.seek(reader.tell() + n)
end
+
+ private
+
+ # Optimize unpacking strings when `unpack1` is available (ruby >= 2.4)
+ if String.instance_methods.include?(:unpack1)
+
+ def read_and_unpack(byte_count, format)
+ @reader.read(byte_count).unpack1(format)
+ end
+
+ else
+
+ def read_and_unpack(byte_count, format)
+ @reader.read(byte_count).unpack(format)[0]
+ end
+
+ end
end
# Write leaf values
class BinaryEncoder
attr_reader :writer
@@ -157,11 +174,11 @@
# null is written as zero bytes
def write_null(datum)
nil
end
- # a boolean is written as a single byte
+ # a boolean is written as a single byte
# whose value is either 0 (false) or 1 (true).
def write_boolean(datum)
on_disk = datum ? 1.chr : 0.chr
writer.write(on_disk)
end
@@ -173,11 +190,10 @@
end
# int and long values are written using variable-length,
# zig-zag coding.
def write_long(n)
- foo = n
n = (n << 1) ^ (n >> 63)
while (n & ~0x7F) != 0
@writer.write(((n & 0x7f) | 0x80).chr)
n >>= 7
end
@@ -187,19 +203,19 @@
# A float is written as 4 bytes.
# The float is converted into a 32-bit integer using a method
# equivalent to Java's floatToIntBits and then encoded in
# little-endian format.
def write_float(datum)
- @writer.write([datum].pack('e'))
+ @writer.write([datum].pack('e'.freeze))
end
# A double is written as 8 bytes.
# The double is converted into a 64-bit integer using a method
# equivalent to Java's doubleToLongBits and then encoded in
# little-endian format.
def write_double(datum)
- @writer.write([datum].pack('E'))
+ @writer.write([datum].pack('E'.freeze))
end
# Bytes are encoded as a long followed by that many bytes of data.
def write_bytes(datum)
write_long(datum.bytesize)
@@ -207,11 +223,11 @@
end
# A string is encoded as a long followed by that many bytes of
# UTF-8 encoded character data
def write_string(datum)
- datum = datum.encode('utf-8') if datum.respond_to? :encode
+ datum = datum.encode('utf-8'.freeze) if datum.respond_to? :encode
write_bytes(datum)
end
# Write an arbritary datum.
def write(datum)
@@ -219,50 +235,11 @@
end
end
class DatumReader
def self.match_schemas(writers_schema, readers_schema)
- w_type = writers_schema.type_sym
- r_type = readers_schema.type_sym
-
- # This conditional is begging for some OO love.
- if w_type == :union || r_type == :union
- return true
- end
-
- if w_type == r_type
- return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
-
- case r_type
- when :record
- return writers_schema.fullname == readers_schema.fullname
- when :error
- return writers_schema.fullname == readers_schema.fullname
- when :request
- return true
- when :fixed
- return writers_schema.fullname == readers_schema.fullname &&
- writers_schema.size == readers_schema.size
- when :enum
- return writers_schema.fullname == readers_schema.fullname
- when :map
- return writers_schema.values.type == readers_schema.values.type
- when :array
- return writers_schema.items.type == readers_schema.items.type
- end
- end
-
- # Handle schema promotion
- if w_type == :int && [:long, :float, :double].include?(r_type)
- return true
- elsif w_type == :long && [:float, :double].include?(r_type)
- return true
- elsif w_type == :float && r_type == :double
- return true
- end
-
- return false
+ Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
end
attr_accessor :writers_schema, :readers_schema
def initialize(writers_schema=nil, readers_schema=nil)
@@ -291,11 +268,11 @@
raise SchemaMatchException.new(writers_schema, readers_schema)
end
# function dispatch for reading data based on type of writer's
# schema
- case writers_schema.type_sym
+ datum = case writers_schema.type_sym
when :null; decoder.read_null
when :boolean; decoder.read_boolean
when :string; decoder.read_string
when :int; decoder.read_int
when :long; decoder.read_long
@@ -309,10 +286,12 @@
when :union; read_union(writers_schema, readers_schema, decoder)
when :record, :error, :request; read_record(writers_schema, readers_schema, decoder)
else
raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
end
+
+ readers_schema.type_adapter.decode(datum)
end
def read_fixed(writers_schema, readers_schema, decoder)
decoder.read(writers_schema.size)
end
@@ -334,11 +313,11 @@
read_items = []
block_count = decoder.read_long
while block_count != 0
if block_count < 0
block_count = -block_count
- block_size = decoder.read_long
+ _block_size = decoder.read_long
end
block_count.times do
read_items << read_data(writers_schema.items,
readers_schema.items,
decoder)
@@ -353,11 +332,11 @@
read_items = {}
block_count = decoder.read_long
while block_count != 0
if block_count < 0
block_count = -block_count
- block_size = decoder.read_long
+ _block_size = decoder.read_long
end
block_count.times do
key = decoder.read_string
read_items[key] = read_data(writers_schema.values,
readers_schema.values,
@@ -391,28 +370,24 @@
# fill in the default values
if readers_fields_hash.size > read_record.size
writers_fields_hash = writers_schema.fields_hash
readers_fields_hash.each do |field_name, field|
unless writers_fields_hash.has_key? field_name
- if !field.default.nil?
+ if field.default?
field_val = read_default_value(field.type, field.default)
read_record[field.name] = field_val
else
- # FIXME(jmhodges) another 'unset' here
+ raise AvroError, "Missing data for #{field.type} with no default"
end
end
end
end
read_record
end
def read_default_value(field_schema, default_value)
- if default_value == :no_default
- raise AvroError, "Missing data for #{field_schema} with no default"
- end
-
# Basically a JSON Decoder?
case field_schema.type_sym
when :null
return nil
when :boolean
@@ -522,11 +497,11 @@
block_count = decoder.read_long
while block_count != 0
if block_count < 0
decoder.skip(decoder.read_long)
else
- block_count.times &blk
+ block_count.times(&blk)
end
block_count = decoder.read_long
end
end
end # DatumReader
@@ -540,12 +515,14 @@
def write(datum, encoder)
write_data(writers_schema, datum, encoder)
end
- def write_data(writers_schema, datum, encoder)
- unless Schema.validate(writers_schema, datum)
+ def write_data(writers_schema, logical_datum, encoder)
+ datum = writers_schema.type_adapter.encode(logical_datum)
+
+ unless Schema.validate(writers_schema, datum, { recursive: false, encoded: true })
raise AvroTypeError.new(writers_schema, datum)
end
# function dispatch to write datum
case writers_schema.type_sym
@@ -576,20 +553,22 @@
index_of_datum = writers_schema.symbols.index(datum)
encoder.write_int(index_of_datum)
end
def write_array(writers_schema, datum, encoder)
+ raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Array)
if datum.size > 0
encoder.write_long(datum.size)
datum.each do |item|
write_data(writers_schema.items, item, encoder)
end
end
encoder.write_long(0)
end
def write_map(writers_schema, datum, encoder)
+ raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Hash)
if datum.size > 0
encoder.write_long(datum.size)
datum.each do |k,v|
encoder.write_string(k)
write_data(writers_schema.values, v, encoder)
@@ -608,9 +587,10 @@
encoder.write_long(index_of_schema)
write_data(writers_schema.schemas[index_of_schema], datum, encoder)
end
def write_record(writers_schema, datum, encoder)
+ raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Hash)
writers_schema.fields.each do |field|
write_data(field.type, datum[field.name], encoder)
end
end
end # DatumWriter