lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.2 vs lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.3

- old
+ new

@@ -16,23 +16,25 @@ require 'arrow' require 'parquet' require 'fluent/msgpack_factory' require 'fluent/plugin/buffer/chunk' require 'fluent/plugin/buffer/memory_chunk' +require 'fluent/plugin/buffer/arrow_buffer_string_builder' require 'fluent/plugin/arrow/field_wrapper' module Fluent module Plugin class Buffer class ArrowMemoryChunk < MemoryChunk + include ArrowBufferStringBuilder + def initialize(metadata, schema, field_wrappers, chunk_size: 1024, format: :arrow) super(metadata, compress: :text) @schema = schema @field_wrappers = field_wrappers @chunk_size = chunk_size @format = format - @unpacker = Fluent::MessagePackFactory.engine_factory.unpacker end def read(**kwargs) build_arrow_buffer_string end @@ -46,37 +48,11 @@ io.write build_arrow_buffer_string end private - def build_arrow_buffer_string - count = 0 - @unpacker.feed_each(@chunk) do |record| - count += 1 - record.each do |k, v| - @field_wrappers[k].append(v) - end - end - arrow_buf = ::Arrow::ResizableBuffer.new(@chunk_bytes * 1.2) - - ::Arrow::BufferOutputStream.open(arrow_buf) do |output| - if @format == :parquet - Parquet::ArrowFileWriter.open(@schema, output) do |writer| - columns = @schema.fields.map do |f| - ::Arrow::Column.new(f, @field_wrappers[f.name].finish) - end - table = ::Arrow::Table.new(@schema, columns) - writer.write_table(table, @chunk_size) - end - else - ::Arrow::RecordBatchFileWriter.open(output, @schema) do |writer| - record_batch = ::Arrow::RecordBatch.new(@schema, count, @field_wrappers.values.map(&:finish)) - writer.write_record_batch(record_batch) - end - end - end - - arrow_buf.data.to_s + def each_record(&block) + Fluent::MessagePackFactory.engine_factory.unpacker.feed_each(@chunk, &block) end end end end end