lib/fluent/plugin/buf_arrow_memory.rb in fluent-plugin-arrow-0.0.1 vs lib/fluent/plugin/buf_arrow_memory.rb in fluent-plugin-arrow-0.0.2
- old
+ new
@@ -14,10 +14,11 @@
# limitations under the License.
require "arrow"
require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/arrow_memory_chunk'
+require 'fluent/plugin/arrow/field_wrapper'
module Fluent
module Plugin
class ArrowMemoryBuffer < Fluent::Plugin::Buffer
Plugin.register_buffer('arrow_memory', self)
@@ -30,44 +31,22 @@
def configure(conf)
super
# [{"name" => foo1, "type" => "uint64"}, {"name" => foo2, "type" => "struct", "fields" => [{"name" => bar1, "type" => "string"}]}
- arrow_fields = @schema.map do |field|
- create_arrow_field(field)
+ @field_wrappers = @schema.each_with_object({}) do |field, h|
+ h[field["name"]] = Fluent::Plugin::Arrow::FieldWrapper.build(field)
end
- @arrow_schema = Arrow::Schema.new(arrow_fields)
+ @arrow_schema = ::Arrow::Schema.new(@field_wrappers.values.map(&:arrow_field))
end
def resume
return {}, []
end
def generate_chunk(metadata)
- Fluent::Plugin::Buffer::ArrowMemoryChunk.new(metadata, @arrow_schema, chunk_size: @row_group_chunk_size, format: @arrow_format)
- end
-
- private
-
- def create_arrow_field(field)
- Arrow::Field.new(field["name"], create_arrow_data_type(field))
- end
-
- def create_arrow_data_type(field)
- case field["type"]
- when "struct"
- Arrow::StructDataType.new(field["fields"].map { |f| create_arrow_field(f) })
- when "list"
- Arrow::ListDataType.new(create_arrow_field(field["value_type"]))
- when "timestamp"
- Arrow::TimestampDataType.new(field["unit"].to_sym)
- else
- data_type_name = field["type"].to_s.capitalize.gsub(/\AUint/, "UInt")
- data_type_class_name = "#{data_type_name}DataType"
- data_type_class = Arrow.const_get(data_type_class_name)
- data_type_class.new
- end
+ Fluent::Plugin::Buffer::ArrowMemoryChunk.new(metadata, @arrow_schema, @field_wrappers, chunk_size: @row_group_chunk_size, format: @arrow_format)
end
end
end
end