lib/fluent/plugin/buf_arrow_memory.rb in fluent-plugin-arrow-0.0.1 vs lib/fluent/plugin/buf_arrow_memory.rb in fluent-plugin-arrow-0.0.2

- old
+ new

@@ -14,10 +14,11 @@ # limitations under the License. require "arrow" require 'fluent/plugin/buffer' require 'fluent/plugin/buffer/arrow_memory_chunk' +require 'fluent/plugin/arrow/field_wrapper' module Fluent module Plugin class ArrowMemoryBuffer < Fluent::Plugin::Buffer Plugin.register_buffer('arrow_memory', self) @@ -30,44 +31,22 @@ def configure(conf) super # [{"name" => foo1, "type" => "uint64"}, {"name" => foo2, "type" => "struct", "fields" => [{"name" => bar1, "type" => "string"}]} - arrow_fields = @schema.map do |field| - create_arrow_field(field) + @field_wrappers = @schema.each_with_object({}) do |field, h| + h[field["name"]] = Fluent::Plugin::Arrow::FieldWrapper.build(field) end - @arrow_schema = Arrow::Schema.new(arrow_fields) + @arrow_schema = ::Arrow::Schema.new(@field_wrappers.values.map(&:arrow_field)) end def resume return {}, [] end def generate_chunk(metadata) - Fluent::Plugin::Buffer::ArrowMemoryChunk.new(metadata, @arrow_schema, chunk_size: @row_group_chunk_size, format: @arrow_format) - end - - private - - def create_arrow_field(field) - Arrow::Field.new(field["name"], create_arrow_data_type(field)) - end - - def create_arrow_data_type(field) - case field["type"] - when "struct" - Arrow::StructDataType.new(field["fields"].map { |f| create_arrow_field(f) }) - when "list" - Arrow::ListDataType.new(create_arrow_field(field["value_type"])) - when "timestamp" - Arrow::TimestampDataType.new(field["unit"].to_sym) - else - data_type_name = field["type"].to_s.capitalize.gsub(/\AUint/, "UInt") - data_type_class_name = "#{data_type_name}DataType" - data_type_class = Arrow.const_get(data_type_class_name) - data_type_class.new - end + Fluent::Plugin::Buffer::ArrowMemoryChunk.new(metadata, @arrow_schema, @field_wrappers, chunk_size: @row_group_chunk_size, format: @arrow_format) end end end end