lib/avro/data_file.rb in avro-1.7.4 vs lib/avro/data_file.rb in avro-1.7.5

- old
+ new

@@ -22,23 +22,22 @@ MAGIC = "Obj" + [VERSION].pack('c') MAGIC_SIZE = MAGIC.size SYNC_SIZE = 16 SYNC_INTERVAL = 1000 * SYNC_SIZE META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}') - VALID_CODECS = ['null'] VALID_ENCODINGS = ['binary'] # not used yet class DataFileError < AvroError; end - def self.open(file_path, mode='r', schema=nil) + def self.open(file_path, mode='r', schema=nil, codec=nil) schema = Avro::Schema.parse(schema) if schema case mode when 'w' unless schema raise DataFileError, "Writing an Avro file requires a schema." end - io = open_writer(File.open(file_path, 'wb'), schema) + io = open_writer(File.open(file_path, 'wb'), schema, codec) when 'r' io = open_reader(File.open(file_path, 'rb'), schema) else raise DataFileError, "Only modes 'r' and 'w' allowed. You gave #{mode.inspect}." end @@ -47,15 +46,38 @@ io ensure io.close if block_given? && io end + def self.codecs + @codecs + end + + def self.register_codec(codec) + @codecs ||= {} + codec = codec.new if !codec.respond_to?(:codec_name) && codec.is_a?(Class) + @codecs[codec.codec_name.to_s] = codec + end + + def self.get_codec(codec) + codec ||= 'null' + if codec.respond_to?(:compress) && codec.respond_to?(:decompress) + codec # it's a codec instance + elsif codec.is_a?(Class) + codec.new # it's a codec class + elsif @codecs.include?(codec.to_s) + @codecs[codec.to_s] # it's a string or symbol (codec name) + else + raise DataFileError, "Unknown codec: #{codec.inspect}" + end + end + class << self private - def open_writer(file, schema) + def open_writer(file, schema, codec=nil) writer = Avro::IO::DatumWriter.new(schema) - Avro::DataFile::Writer.new(file, writer, schema) + Avro::DataFile::Writer.new(file, writer, schema, codec) end def open_reader(file, schema) reader = Avro::IO::DatumReader.new(nil, schema) Avro::DataFile::Reader.new(file, reader) @@ -65,14 +87,14 @@ class Writer def self.generate_sync_marker OpenSSL::Random.random_bytes(16) end - attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta + attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta, :codec attr_accessor :block_count - def initialize(writer, datum_writer, writers_schema=nil) + def initialize(writer, datum_writer, writers_schema=nil, codec=nil) # If writers_schema is not present, presume we're appending @writer = writer @encoder = IO::BinaryEncoder.new(@writer) @datum_writer = datum_writer @buffer_writer = StringIO.new('', 'w') @@ -81,11 +103,12 @@ @meta = {} if writers_schema @sync_marker = Writer.generate_sync_marker - meta['avro.codec'] = 'null' + @codec = DataFile.get_codec(codec) + meta['avro.codec'] = @codec.codec_name.to_s meta['avro.schema'] = writers_schema.to_s datum_writer.writers_schema = writers_schema write_header else # open writer for reading to collect metadata @@ -93,10 +116,11 @@ # FIXME(jmhodges): collect arbitrary metadata # collect metadata @sync_marker = dfr.sync_marker meta['avro.codec'] = dfr.meta['avro.codec'] + @codec = DataFile.get_codec(meta['avro.codec']) # get schema used to write existing file schema_from_file = dfr.meta['avro.schema'] meta['avro.schema'] = schema_from_file datum_writer.writers_schema = Schema.parse(schema_from_file) @@ -150,25 +174,19 @@ writer.write(sync_marker) end # TODO(jmhodges): make a schema for blocks and use datum_writer # TODO(jmhodges): do we really need the number of items in the block? - # TODO(jmhodges): use codec when writing the block contents def write_block if block_count > 0 # write number of items in block and block size in bytes encoder.write_long(block_count) - to_write = buffer_writer.string + to_write = codec.compress(buffer_writer.string) encoder.write_long(to_write.size) # write block contents - if meta['avro.codec'] == 'null' - writer.write(to_write) - else - msg = "#{meta['avro.codec'].inspect} coded is not supported" - raise DataFileError, msg - end + writer.write(to_write) # write sync marker writer.write(sync_marker) # reset buffer @@ -181,26 +199,28 @@ # Read files written by DataFileWriter class Reader include ::Enumerable - attr_reader :reader, :decoder, :datum_reader, :sync_marker, :meta, :file_length - attr_accessor :block_count + # The reader and binary decoder for the raw file stream + attr_reader :reader, :decoder + # The binary decoder for the contents of a block (after codec decompression) + attr_reader :block_decoder + + attr_reader :datum_reader, :sync_marker, :meta, :file_length, :codec + attr_accessor :block_count # records remaining in current block + def initialize(reader, datum_reader) @reader = reader @decoder = IO::BinaryDecoder.new(reader) @datum_reader = datum_reader # read the header: magic, meta, sync read_header - # ensure the codec is valid - codec_from_file = meta['avro.codec'] - if codec_from_file && ! VALID_CODECS.include?(codec_from_file) - raise DataFileError, "Unknown codec: #{codec_from_file}" - end + @codec = DataFile.get_codec(meta['avro.codec']) # get ready to read @block_count = 0 datum_reader.writers_schema = Schema.parse meta['avro.schema'] end @@ -218,11 +238,11 @@ else read_block_header end end - datum = datum_reader.read(decoder) + datum = datum_reader.read(block_decoder) self.block_count -= 1 yield(datum) end end @@ -255,11 +275,13 @@ @sync_marker = reader.read(SYNC_SIZE) end def read_block_header self.block_count = decoder.read_long - decoder.read_long # not doing anything with length in bytes + block_bytes = decoder.read_long + data = codec.decompress(reader.read(block_bytes)) + @block_decoder = IO::BinaryDecoder.new(StringIO.new(data)) end # read the length of the sync marker; if it matches the sync # marker, return true. Otherwise, seek back to where we started # and return false @@ -271,7 +293,50 @@ else true end end end + + + class NullCodec + def codec_name; 'null'; end + def decompress(data); data; end + def compress(data); data; end + end + + class DeflateCodec + attr_reader :level + + def initialize(level=Zlib::DEFAULT_COMPRESSION) + @level = level + end + + def codec_name; 'deflate'; end + + def decompress(compressed) + # Passing a negative number to Inflate puts it into "raw" RFC1951 mode + # (without the RFC1950 header & checksum). See the docs for + # inflateInit2 in http://www.zlib.net/manual.html + zstream = Zlib::Inflate.new(-Zlib::MAX_WBITS) + data = zstream.inflate(compressed) + data << zstream.finish + ensure + zstream.close + end + + def compress(data) + zstream = Zlib::Deflate.new(level, -Zlib::MAX_WBITS) + compressed = zstream.deflate(data) + compressed << zstream.finish + ensure + zstream.close + end + end + + DataFile.register_codec NullCodec + DataFile.register_codec DeflateCodec + + # TODO this constant won't be updated if you register another codec. + # Deprecated in favor of Avro::DataFile::codecs + VALID_CODECS = DataFile.codecs.keys end end