lib/avromatic/io/datum_reader.rb in avromatic-2.2.6 vs lib/avromatic/io/datum_reader.rb in avromatic-2.3.0
- old
+ new
@@ -1,88 +1,38 @@
# frozen_string_literal: true
-# rubocop:disable Style/WhenThen
module Avromatic
module IO
# Subclass DatumReader to include additional information about the union
# member index used. The code modified below is based on salsify/avro,
# branch 'salsify-master' with the tag 'v1.9.0.3'
class DatumReader < Avro::IO::DatumReader
- UNION_MEMBER_INDEX = Avromatic::IO::UNION_MEMBER_INDEX
-
- def read_data(writers_schema, readers_schema, decoder, initial_record = nil)
- # schema matching
- unless self.class.match_schemas(writers_schema, readers_schema)
- raise Avro::IO::SchemaMatchException.new(writers_schema, readers_schema)
- end
-
+ def read_data(writers_schema, readers_schema, decoder)
# schema resolution: reader's schema is a union, writer's schema is not
- if writers_schema.type_sym != :union && readers_schema.type_sym == :union
- rs_index = readers_schema.schemas.find_index do |s|
- self.class.match_schemas(writers_schema, s)
- end
+ return super unless writers_schema.type_sym != :union && readers_schema.type_sym == :union
- optional = readers_schema.schemas.first.type_sym == :null
- union_info = if readers_schema.schemas.size == 2 && optional
- # Avromatic does not treat the union of null and 1 other type as a union
- nil
- elsif optional
- # Avromatic does not treat the null of an optional field as part of the union
- { UNION_MEMBER_INDEX => rs_index - 1 }
- else
- { UNION_MEMBER_INDEX => rs_index }
- end
-
- return read_data(writers_schema, readers_schema.schemas[rs_index], decoder, union_info) if rs_index
- raise Avro::IO::SchemaMatchException.new(writers_schema, readers_schema)
+ rs_index = readers_schema.schemas.find_index do |s|
+ self.class.match_schemas(writers_schema, s)
end
- # function dispatch for reading data based on type of writer's schema
- datum = case writers_schema.type_sym
- when :null; decoder.read_null
- when :boolean; decoder.read_boolean
- when :string; decoder.read_string
- when :int; decoder.read_int
- when :long; decoder.read_long
- when :float; decoder.read_float
- when :double; decoder.read_double
- when :bytes; decoder.read_bytes
- when :fixed; read_fixed(writers_schema, readers_schema, decoder)
- when :enum; read_enum(writers_schema, readers_schema, decoder)
- when :array; read_array(writers_schema, readers_schema, decoder)
- when :map; read_map(writers_schema, readers_schema, decoder)
- when :union; read_union(writers_schema, readers_schema, decoder)
- when :record, :error, :request; read_record(writers_schema, readers_schema, decoder, initial_record || {})
- else
- raise Avro::AvroError.new("Cannot read unknown schema type: #{writers_schema.type}")
- end
+ raise Avro::IO::SchemaMatchException.new(writers_schema, readers_schema) unless rs_index
- # Allow this code to be used with an official Avro release or the
- # avro-patches gem that includes logical_type support.
- if readers_schema.respond_to?(:logical_type)
- readers_schema.type_adapter.decode(datum)
- else
+ datum = read_data(writers_schema, readers_schema.schemas[rs_index], decoder)
+ optional = readers_schema.schemas.first.type_sym == :null
+
+ if readers_schema.schemas.size == 2 && optional
+ # Avromatic does not treat the union of null and 1 other type as a union
datum
+ elsif datum.nil?
+ # Avromatic does not treat the null of an optional field as part of the union
+ nil
+ else
+ # Avromatic does not treat the null of an optional field as part of the union so
+ # adjust the member index accordingly
+ member_index = optional ? rs_index - 1 : rs_index
+ Avromatic::IO::UnionDatum.new(member_index, datum)
end
end
-
- # Override to specify an initial record that may contain union index
- def read_record(writers_schema, readers_schema, decoder, initial_record = {})
- readers_fields_hash = readers_schema.fields_hash
- read_record = Avromatic.use_custom_datum_reader ? initial_record : {}
- writers_schema.fields.each do |field|
- readers_field = readers_fields_hash[field.name]
- if readers_field
- field_val = read_data(field.type, readers_field.type, decoder)
- read_record[field.name] = field_val
- else
- skip_data(field.type, decoder)
- end
- end
-
- read_record
- end
end
end
end
-# rubocop:enable Style/WhenThen