lib/avro/io.rb in avro-1.3.0 vs lib/avro/io.rb in avro-1.3.3

- old
+ new

@@ -74,33 +74,19 @@ def read_float # A float is written as 4 bytes. # The float is converted into a 32-bit integer using a method # equivalent to Java's floatToIntBits and then encoded in # little-endian format. - - bits = (byte! & 0xFF) | - ((byte! & 0xff) << 8) | - ((byte! & 0xff) << 16) | - ((byte! & 0xff) << 24) - [bits].pack('i').unpack('e')[0] + @reader.read(4).unpack('e')[0] end def read_double # A double is written as 8 bytes. # The double is converted into a 64-bit integer using a method # equivalent to Java's doubleToLongBits and then encoded in # little-endian format. - - bits = (byte! & 0xFF) | - ((byte! & 0xff) << 8) | - ((byte! & 0xff) << 16) | - ((byte! & 0xff) << 24) | - ((byte! & 0xff) << 32) | - ((byte! & 0xff) << 40) | - ((byte! & 0xff) << 48) | - ((byte! & 0xff) << 56) - [bits].pack('Q').unpack('d')[0] + @reader.read(8).unpack('E')[0] end def read_bytes # Bytes are encoded as a long followed by that many bytes of # data. @@ -200,31 +186,19 @@ # A float is written as 4 bytes. # The float is converted into a 32-bit integer using a method # equivalent to Java's floatToIntBits and then encoded in # little-endian format. def write_float(datum) - bits = [datum].pack('e').unpack('i')[0] - @writer.write(((bits ) & 0xFF).chr) - @writer.write(((bits >> 8 ) & 0xFF).chr) - @writer.write(((bits >> 16) & 0xFF).chr) - @writer.write(((bits >> 24) & 0xFF).chr) + @writer.write([datum].pack('e')) end # A double is written as 8 bytes. # The double is converted into a 64-bit integer using a method # equivalent to Java's doubleToLongBits and then encoded in # little-endian format. def write_double(datum) - bits = [datum].pack('d').unpack('Q')[0] - @writer.write(((bits ) & 0xFF).chr) - @writer.write(((bits >> 8 ) & 0xFF).chr) - @writer.write(((bits >> 16) & 0xFF).chr) - @writer.write(((bits >> 24) & 0xFF).chr) - @writer.write(((bits >> 32) & 0xFF).chr) - @writer.write(((bits >> 40) & 0xFF).chr) - @writer.write(((bits >> 48) & 0xFF).chr) - @writer.write(((bits >> 56) & 0xFF).chr) + @writer.write([datum].pack('E')) end # Bytes are encoded as a long followed by that many bytes of data. def write_bytes(datum) write_long(datum.size) @@ -245,46 +219,47 @@ end class DatumReader def self.check_props(schema_one, schema_two, prop_list) prop_list.all? do |prop| - schema_one.to_hash[prop] == schema_two.to_hash[prop] + schema_one.send(prop) == schema_two.send(prop) end end def self.match_schemas(writers_schema, readers_schema) w_type = writers_schema.type r_type = readers_schema.type # This conditional is begging for some OO love. - if [w_type, r_type].include? 'union' + if w_type == 'union' || r_type == 'union' return true - elsif Schema::PRIMITIVE_TYPES.include?(w_type) && - Schema::PRIMITIVE_TYPES.include?(r_type) && - w_type == r_type - return true - elsif (w_type == r_type) && (r_type == 'record') && - check_props(writers_schema, readers_schema, ['fullname']) - return true - elsif w_type == r_type && r_type == 'error' && check_props(writers_scheam, readers_schema, ['fullname']) - return true - elsif w_type == r_type && r_type == 'request' - return true - elsif (w_type == r_type) && (r_type == 'fixed') && - check_props(writers_schema, readers_schema, ['fullname', 'size']) - return true - elsif (w_type == r_type) && (r_type == 'enum') && - check_props(writers_schema, readers_schema, ['fullname']) - return true - elsif (w_type == r_type) && (r_type == 'map') && - check_props(writers_schema.values, readers_schema.values, ['type']) - return true - elsif (w_type == r_type) && (r_type == 'array') && - check_props(writers_schema.items, readers_schema.items, ['type']) - return true end + if w_type == r_type + if Schema::PRIMITIVE_TYPES.include?(w_type) && + Schema::PRIMITIVE_TYPES.include?(r_type) + return true + end + + case r_type + when 'record' + return check_props(writers_schema, readers_schema, [:fullname]) + when 'error' + return check_props(writers_scheam, readers_schema, [:fullname]) + when 'request' + return true + when 'fixed' + return check_props(writers_schema, readers_schema, [:fullname, :size]) + when 'enum' + return check_props(writers_schema, readers_schema, [:fullname]) + when 'map' + return check_props(writers_schema.values, readers_schema.values, [:type]) + when 'array' + return check_props(writers_schema.items, readers_schema.items, [:type]) + end + end + # Handle schema promotion if w_type == 'int' && ['long', 'float', 'double'].include?(r_type) return true elsif w_type == 'long' && ['float', 'double'].include?(r_type) return true @@ -422,11 +397,10 @@ # fill in the default values if readers_fields_hash.size > read_record.size writers_fields_hash = writers_schema.fields_hash readers_fields_hash.each do |field_name, field| - unless writers_fields_hash.has_key? field_name if !field.default.nil? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else @@ -478,9 +452,80 @@ end return read_record else fail_msg = "Unknown type: #{field_schema.type}" raise AvroError(fail_msg) + end + end + + def skip_data(writers_schema, decoder) + case writers_schema.type + when 'null' + decoder.skip_null + when 'boolean' + decoder.skip_boolean + when 'string' + decoder.skip_string + when 'int' + decoder.skip_int + when 'long' + decoder.skip_long + when 'float' + decoder.skip_float + when 'double' + decoder.skip_double + when 'bytes' + decoder.skip_bytes + when 'fixed' + skip_fixed(writers_schema, decoder) + when 'enum' + skip_enum(writers_schema, decoder) + when 'array' + skip_array(writers_schema, decoder) + when 'map' + skip_map(writers_schema, decoder) + when 'union' + skip_union(writers_schema, decoder) + when 'record', 'error', 'request' + skip_record(writers_schema, decoder) + else + raise AvroError, "Unknown schema type: #{schm.type}" + end + end + + def skip_fixed(writers_schema, decoder) + decoder.skip(writers_schema.size) + end + + def skip_enum(writers_schema, decoder) + decoder.skip_int + end + + def skip_array(writers_schema, decoder) + skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } + end + + def skip_map(writers_schema, decoder) + skip_blocks(decoder) { + decoder.skip_string + skip_data(writers_schema.values, decoder) + } + end + + def skip_record(writers_schema, decoder) + writers_schema.fields.each{|f| skip_data(f.type, decoder) } + end + + private + def skip_blocks(decoder, &blk) + block_count = decoder.read_long + while block_count != 0 + if block_count < 0 + decoder.skip(decoder.read_long) + else + block_count.times &blk + end + block_count = decoder.read_long end end end # DatumReader # DatumWriter for generic ruby objects