lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.2 vs lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.3
- old
+ new
@@ -16,23 +16,25 @@
require 'arrow'
require 'parquet'
require 'fluent/msgpack_factory'
require 'fluent/plugin/buffer/chunk'
require 'fluent/plugin/buffer/memory_chunk'
+require 'fluent/plugin/buffer/arrow_buffer_string_builder'
require 'fluent/plugin/arrow/field_wrapper'
module Fluent
module Plugin
class Buffer
class ArrowMemoryChunk < MemoryChunk
+ include ArrowBufferStringBuilder
+
def initialize(metadata, schema, field_wrappers, chunk_size: 1024, format: :arrow)
super(metadata, compress: :text)
@schema = schema
@field_wrappers = field_wrappers
@chunk_size = chunk_size
@format = format
- @unpacker = Fluent::MessagePackFactory.engine_factory.unpacker
end
def read(**kwargs)
build_arrow_buffer_string
end
@@ -46,37 +48,11 @@
io.write build_arrow_buffer_string
end
private
- def build_arrow_buffer_string
- count = 0
- @unpacker.feed_each(@chunk) do |record|
- count += 1
- record.each do |k, v|
- @field_wrappers[k].append(v)
- end
- end
- arrow_buf = ::Arrow::ResizableBuffer.new(@chunk_bytes * 1.2)
-
- ::Arrow::BufferOutputStream.open(arrow_buf) do |output|
- if @format == :parquet
- Parquet::ArrowFileWriter.open(@schema, output) do |writer|
- columns = @schema.fields.map do |f|
- ::Arrow::Column.new(f, @field_wrappers[f.name].finish)
- end
- table = ::Arrow::Table.new(@schema, columns)
- writer.write_table(table, @chunk_size)
- end
- else
- ::Arrow::RecordBatchFileWriter.open(output, @schema) do |writer|
- record_batch = ::Arrow::RecordBatch.new(@schema, count, @field_wrappers.values.map(&:finish))
- writer.write_record_batch(record_batch)
- end
- end
- end
-
- arrow_buf.data.to_s
+ def each_record(&block)
+ Fluent::MessagePackFactory.engine_factory.unpacker.feed_each(@chunk, &block)
end
end
end
end
end