lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.1 vs lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.2

- old
+ new

@@ -13,26 +13,25 @@ # See the License for the specific language governing permissions and # limitations under the License. require 'arrow' require 'parquet' +require 'fluent/msgpack_factory' require 'fluent/plugin/buffer/chunk' require 'fluent/plugin/buffer/memory_chunk' +require 'fluent/plugin/arrow/field_wrapper' module Fluent module Plugin class Buffer class ArrowMemoryChunk < MemoryChunk - def initialize(metadata, schema, chunk_size: 1024, format: :arrow) + def initialize(metadata, schema, field_wrappers, chunk_size: 1024, format: :arrow) super(metadata, compress: :text) @schema = schema + @field_wrappers = field_wrappers @chunk_size = chunk_size @format = format - @array_builders = {} - @schema.fields.each do |f| - @array_builders[f.name] = field_to_array_builder(f) - end @unpacker = Fluent::MessagePackFactory.engine_factory.unpacker end def read(**kwargs) build_arrow_buffer_string @@ -47,51 +46,31 @@ io.write build_arrow_buffer_string end private - def field_to_array_builder(f) - data_type_str = f.data_type.to_s - if data_type_str =~ /timestamp/ - return Arrow::TimestampArrayBuilder.new(f.data_type) - end - - data_type_name = data_type_str.capitalize.gsub(/\AUint/, "UInt") - array_builder_class_name = "#{data_type_name}ArrayBuilder" - array_builder_class = Arrow.const_get(array_builder_class_name) - if array_builder_class.method(:new).arity > 0 - array_builder_class.new(f.data_type) - else - array_builder_class.new - end - end - def build_arrow_buffer_string count = 0 @unpacker.feed_each(@chunk) do |record| count += 1 record.each do |k, v| - if v.nil? - @array_builders[k].append_null - else - @array_builders[k].append(v) - end + @field_wrappers[k].append(v) end end - arrow_buf = Arrow::ResizableBuffer.new(@chunk_bytes * 1.2) + arrow_buf = ::Arrow::ResizableBuffer.new(@chunk_bytes * 1.2) - Arrow::BufferOutputStream.open(arrow_buf) do |output| + ::Arrow::BufferOutputStream.open(arrow_buf) do |output| if @format == :parquet Parquet::ArrowFileWriter.open(@schema, output) do |writer| columns = @schema.fields.map do |f| - Arrow::Column.new(f, @array_builders[f.name].finish) + ::Arrow::Column.new(f, @field_wrappers[f.name].finish) end - table = Arrow::Table.new(@schema, columns) + table = ::Arrow::Table.new(@schema, columns) writer.write_table(table, @chunk_size) end else - Arrow::RecordBatchFileWriter.open(output, @schema) do |writer| - record_batch = Arrow::RecordBatch.new(@schema, count, @array_builders.values.map(&:finish)) + ::Arrow::RecordBatchFileWriter.open(output, @schema) do |writer| + record_batch = ::Arrow::RecordBatch.new(@schema, count, @field_wrappers.values.map(&:finish)) writer.write_record_batch(record_batch) end end end