lib/avro/data_file.rb in avro-1.7.4 vs lib/avro/data_file.rb in avro-1.7.5
- old
+ new
@@ -22,23 +22,22 @@
MAGIC = "Obj" + [VERSION].pack('c')
MAGIC_SIZE = MAGIC.size
SYNC_SIZE = 16
SYNC_INTERVAL = 1000 * SYNC_SIZE
META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}')
- VALID_CODECS = ['null']
VALID_ENCODINGS = ['binary'] # not used yet
class DataFileError < AvroError; end
- def self.open(file_path, mode='r', schema=nil)
+ def self.open(file_path, mode='r', schema=nil, codec=nil)
schema = Avro::Schema.parse(schema) if schema
case mode
when 'w'
unless schema
raise DataFileError, "Writing an Avro file requires a schema."
end
- io = open_writer(File.open(file_path, 'wb'), schema)
+ io = open_writer(File.open(file_path, 'wb'), schema, codec)
when 'r'
io = open_reader(File.open(file_path, 'rb'), schema)
else
raise DataFileError, "Only modes 'r' and 'w' allowed. You gave #{mode.inspect}."
end
@@ -47,15 +46,38 @@
io
ensure
io.close if block_given? && io
end
+ def self.codecs
+ @codecs
+ end
+
+ def self.register_codec(codec)
+ @codecs ||= {}
+ codec = codec.new if !codec.respond_to?(:codec_name) && codec.is_a?(Class)
+ @codecs[codec.codec_name.to_s] = codec
+ end
+
+ def self.get_codec(codec)
+ codec ||= 'null'
+ if codec.respond_to?(:compress) && codec.respond_to?(:decompress)
+ codec # it's a codec instance
+ elsif codec.is_a?(Class)
+ codec.new # it's a codec class
+ elsif @codecs.include?(codec.to_s)
+ @codecs[codec.to_s] # it's a string or symbol (codec name)
+ else
+ raise DataFileError, "Unknown codec: #{codec.inspect}"
+ end
+ end
+
class << self
private
- def open_writer(file, schema)
+ def open_writer(file, schema, codec=nil)
writer = Avro::IO::DatumWriter.new(schema)
- Avro::DataFile::Writer.new(file, writer, schema)
+ Avro::DataFile::Writer.new(file, writer, schema, codec)
end
def open_reader(file, schema)
reader = Avro::IO::DatumReader.new(nil, schema)
Avro::DataFile::Reader.new(file, reader)
@@ -65,14 +87,14 @@
class Writer
def self.generate_sync_marker
OpenSSL::Random.random_bytes(16)
end
- attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta
+ attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta, :codec
attr_accessor :block_count
- def initialize(writer, datum_writer, writers_schema=nil)
+ def initialize(writer, datum_writer, writers_schema=nil, codec=nil)
# If writers_schema is not present, presume we're appending
@writer = writer
@encoder = IO::BinaryEncoder.new(@writer)
@datum_writer = datum_writer
@buffer_writer = StringIO.new('', 'w')
@@ -81,11 +103,12 @@
@meta = {}
if writers_schema
@sync_marker = Writer.generate_sync_marker
- meta['avro.codec'] = 'null'
+ @codec = DataFile.get_codec(codec)
+ meta['avro.codec'] = @codec.codec_name.to_s
meta['avro.schema'] = writers_schema.to_s
datum_writer.writers_schema = writers_schema
write_header
else
# open writer for reading to collect metadata
@@ -93,10 +116,11 @@
# FIXME(jmhodges): collect arbitrary metadata
# collect metadata
@sync_marker = dfr.sync_marker
meta['avro.codec'] = dfr.meta['avro.codec']
+ @codec = DataFile.get_codec(meta['avro.codec'])
# get schema used to write existing file
schema_from_file = dfr.meta['avro.schema']
meta['avro.schema'] = schema_from_file
datum_writer.writers_schema = Schema.parse(schema_from_file)
@@ -150,25 +174,19 @@
writer.write(sync_marker)
end
# TODO(jmhodges): make a schema for blocks and use datum_writer
# TODO(jmhodges): do we really need the number of items in the block?
- # TODO(jmhodges): use codec when writing the block contents
def write_block
if block_count > 0
# write number of items in block and block size in bytes
encoder.write_long(block_count)
- to_write = buffer_writer.string
+ to_write = codec.compress(buffer_writer.string)
encoder.write_long(to_write.size)
# write block contents
- if meta['avro.codec'] == 'null'
- writer.write(to_write)
- else
- msg = "#{meta['avro.codec'].inspect} coded is not supported"
- raise DataFileError, msg
- end
+ writer.write(to_write)
# write sync marker
writer.write(sync_marker)
# reset buffer
@@ -181,26 +199,28 @@
# Read files written by DataFileWriter
class Reader
include ::Enumerable
- attr_reader :reader, :decoder, :datum_reader, :sync_marker, :meta, :file_length
- attr_accessor :block_count
+ # The reader and binary decoder for the raw file stream
+ attr_reader :reader, :decoder
+ # The binary decoder for the contents of a block (after codec decompression)
+ attr_reader :block_decoder
+
+ attr_reader :datum_reader, :sync_marker, :meta, :file_length, :codec
+ attr_accessor :block_count # records remaining in current block
+
def initialize(reader, datum_reader)
@reader = reader
@decoder = IO::BinaryDecoder.new(reader)
@datum_reader = datum_reader
# read the header: magic, meta, sync
read_header
- # ensure the codec is valid
- codec_from_file = meta['avro.codec']
- if codec_from_file && ! VALID_CODECS.include?(codec_from_file)
- raise DataFileError, "Unknown codec: #{codec_from_file}"
- end
+ @codec = DataFile.get_codec(meta['avro.codec'])
# get ready to read
@block_count = 0
datum_reader.writers_schema = Schema.parse meta['avro.schema']
end
@@ -218,11 +238,11 @@
else
read_block_header
end
end
- datum = datum_reader.read(decoder)
+ datum = datum_reader.read(block_decoder)
self.block_count -= 1
yield(datum)
end
end
@@ -255,11 +275,13 @@
@sync_marker = reader.read(SYNC_SIZE)
end
def read_block_header
self.block_count = decoder.read_long
- decoder.read_long # not doing anything with length in bytes
+ block_bytes = decoder.read_long
+ data = codec.decompress(reader.read(block_bytes))
+ @block_decoder = IO::BinaryDecoder.new(StringIO.new(data))
end
# read the length of the sync marker; if it matches the sync
# marker, return true. Otherwise, seek back to where we started
# and return false
@@ -271,7 +293,50 @@
else
true
end
end
end
+
+
+ class NullCodec
+ def codec_name; 'null'; end
+ def decompress(data); data; end
+ def compress(data); data; end
+ end
+
+ class DeflateCodec
+ attr_reader :level
+
+ def initialize(level=Zlib::DEFAULT_COMPRESSION)
+ @level = level
+ end
+
+ def codec_name; 'deflate'; end
+
+ def decompress(compressed)
+ # Passing a negative number to Inflate puts it into "raw" RFC1951 mode
+ # (without the RFC1950 header & checksum). See the docs for
+ # inflateInit2 in http://www.zlib.net/manual.html
+ zstream = Zlib::Inflate.new(-Zlib::MAX_WBITS)
+ data = zstream.inflate(compressed)
+ data << zstream.finish
+ ensure
+ zstream.close
+ end
+
+ def compress(data)
+ zstream = Zlib::Deflate.new(level, -Zlib::MAX_WBITS)
+ compressed = zstream.deflate(data)
+ compressed << zstream.finish
+ ensure
+ zstream.close
+ end
+ end
+
+ DataFile.register_codec NullCodec
+ DataFile.register_codec DeflateCodec
+
+ # TODO this constant won't be updated if you register another codec.
+ # Deprecated in favor of Avro::DataFile::codecs
+ VALID_CODECS = DataFile.codecs.keys
end
end