lib/avro/io.rb in avro-1.3.0 vs lib/avro/io.rb in avro-1.3.3
- old
+ new
@@ -74,33 +74,19 @@
def read_float
# A float is written as 4 bytes.
# The float is converted into a 32-bit integer using a method
# equivalent to Java's floatToIntBits and then encoded in
# little-endian format.
-
- bits = (byte! & 0xFF) |
- ((byte! & 0xff) << 8) |
- ((byte! & 0xff) << 16) |
- ((byte! & 0xff) << 24)
- [bits].pack('i').unpack('e')[0]
+ @reader.read(4).unpack('e')[0]
end
def read_double
# A double is written as 8 bytes.
# The double is converted into a 64-bit integer using a method
# equivalent to Java's doubleToLongBits and then encoded in
# little-endian format.
-
- bits = (byte! & 0xFF) |
- ((byte! & 0xff) << 8) |
- ((byte! & 0xff) << 16) |
- ((byte! & 0xff) << 24) |
- ((byte! & 0xff) << 32) |
- ((byte! & 0xff) << 40) |
- ((byte! & 0xff) << 48) |
- ((byte! & 0xff) << 56)
- [bits].pack('Q').unpack('d')[0]
+ @reader.read(8).unpack('E')[0]
end
def read_bytes
# Bytes are encoded as a long followed by that many bytes of
# data.
@@ -200,31 +186,19 @@
# A float is written as 4 bytes.
# The float is converted into a 32-bit integer using a method
# equivalent to Java's floatToIntBits and then encoded in
# little-endian format.
def write_float(datum)
- bits = [datum].pack('e').unpack('i')[0]
- @writer.write(((bits ) & 0xFF).chr)
- @writer.write(((bits >> 8 ) & 0xFF).chr)
- @writer.write(((bits >> 16) & 0xFF).chr)
- @writer.write(((bits >> 24) & 0xFF).chr)
+ @writer.write([datum].pack('e'))
end
# A double is written as 8 bytes.
# The double is converted into a 64-bit integer using a method
# equivalent to Java's doubleToLongBits and then encoded in
# little-endian format.
def write_double(datum)
- bits = [datum].pack('d').unpack('Q')[0]
- @writer.write(((bits ) & 0xFF).chr)
- @writer.write(((bits >> 8 ) & 0xFF).chr)
- @writer.write(((bits >> 16) & 0xFF).chr)
- @writer.write(((bits >> 24) & 0xFF).chr)
- @writer.write(((bits >> 32) & 0xFF).chr)
- @writer.write(((bits >> 40) & 0xFF).chr)
- @writer.write(((bits >> 48) & 0xFF).chr)
- @writer.write(((bits >> 56) & 0xFF).chr)
+ @writer.write([datum].pack('E'))
end
# Bytes are encoded as a long followed by that many bytes of data.
def write_bytes(datum)
write_long(datum.size)
@@ -245,46 +219,47 @@
end
class DatumReader
def self.check_props(schema_one, schema_two, prop_list)
prop_list.all? do |prop|
- schema_one.to_hash[prop] == schema_two.to_hash[prop]
+ schema_one.send(prop) == schema_two.send(prop)
end
end
def self.match_schemas(writers_schema, readers_schema)
w_type = writers_schema.type
r_type = readers_schema.type
# This conditional is begging for some OO love.
- if [w_type, r_type].include? 'union'
+ if w_type == 'union' || r_type == 'union'
return true
- elsif Schema::PRIMITIVE_TYPES.include?(w_type) &&
- Schema::PRIMITIVE_TYPES.include?(r_type) &&
- w_type == r_type
- return true
- elsif (w_type == r_type) && (r_type == 'record') &&
- check_props(writers_schema, readers_schema, ['fullname'])
- return true
- elsif w_type == r_type && r_type == 'error' && check_props(writers_scheam, readers_schema, ['fullname'])
- return true
- elsif w_type == r_type && r_type == 'request'
- return true
- elsif (w_type == r_type) && (r_type == 'fixed') &&
- check_props(writers_schema, readers_schema, ['fullname', 'size'])
- return true
- elsif (w_type == r_type) && (r_type == 'enum') &&
- check_props(writers_schema, readers_schema, ['fullname'])
- return true
- elsif (w_type == r_type) && (r_type == 'map') &&
- check_props(writers_schema.values, readers_schema.values, ['type'])
- return true
- elsif (w_type == r_type) && (r_type == 'array') &&
- check_props(writers_schema.items, readers_schema.items, ['type'])
- return true
end
+ if w_type == r_type
+ if Schema::PRIMITIVE_TYPES.include?(w_type) &&
+ Schema::PRIMITIVE_TYPES.include?(r_type)
+ return true
+ end
+
+ case r_type
+ when 'record'
+ return check_props(writers_schema, readers_schema, [:fullname])
+ when 'error'
+ return check_props(writers_scheam, readers_schema, [:fullname])
+ when 'request'
+ return true
+ when 'fixed'
+ return check_props(writers_schema, readers_schema, [:fullname, :size])
+ when 'enum'
+ return check_props(writers_schema, readers_schema, [:fullname])
+ when 'map'
+ return check_props(writers_schema.values, readers_schema.values, [:type])
+ when 'array'
+ return check_props(writers_schema.items, readers_schema.items, [:type])
+ end
+ end
+
# Handle schema promotion
if w_type == 'int' && ['long', 'float', 'double'].include?(r_type)
return true
elsif w_type == 'long' && ['float', 'double'].include?(r_type)
return true
@@ -422,11 +397,10 @@
# fill in the default values
if readers_fields_hash.size > read_record.size
writers_fields_hash = writers_schema.fields_hash
readers_fields_hash.each do |field_name, field|
-
unless writers_fields_hash.has_key? field_name
if !field.default.nil?
field_val = read_default_value(field.type, field.default)
read_record[field.name] = field_val
else
@@ -478,9 +452,80 @@
end
return read_record
else
fail_msg = "Unknown type: #{field_schema.type}"
raise AvroError(fail_msg)
+ end
+ end
+
+ def skip_data(writers_schema, decoder)
+ case writers_schema.type
+ when 'null'
+ decoder.skip_null
+ when 'boolean'
+ decoder.skip_boolean
+ when 'string'
+ decoder.skip_string
+ when 'int'
+ decoder.skip_int
+ when 'long'
+ decoder.skip_long
+ when 'float'
+ decoder.skip_float
+ when 'double'
+ decoder.skip_double
+ when 'bytes'
+ decoder.skip_bytes
+ when 'fixed'
+ skip_fixed(writers_schema, decoder)
+ when 'enum'
+ skip_enum(writers_schema, decoder)
+ when 'array'
+ skip_array(writers_schema, decoder)
+ when 'map'
+ skip_map(writers_schema, decoder)
+ when 'union'
+ skip_union(writers_schema, decoder)
+ when 'record', 'error', 'request'
+ skip_record(writers_schema, decoder)
+ else
+ raise AvroError, "Unknown schema type: #{schm.type}"
+ end
+ end
+
+ def skip_fixed(writers_schema, decoder)
+ decoder.skip(writers_schema.size)
+ end
+
+ def skip_enum(writers_schema, decoder)
+ decoder.skip_int
+ end
+
+ def skip_array(writers_schema, decoder)
+ skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
+ end
+
+ def skip_map(writers_schema, decoder)
+ skip_blocks(decoder) {
+ decoder.skip_string
+ skip_data(writers_schema.values, decoder)
+ }
+ end
+
+ def skip_record(writers_schema, decoder)
+ writers_schema.fields.each{|f| skip_data(f.type, decoder) }
+ end
+
+ private
+ def skip_blocks(decoder, &blk)
+ block_count = decoder.read_long
+ while block_count != 0
+ if block_count < 0
+ decoder.skip(decoder.read_long)
+ else
+ block_count.times &blk
+ end
+ block_count = decoder.read_long
end
end
end # DatumReader
# DatumWriter for generic ruby objects