Sha256: 68eb1f57010a150713d4f3c3fa2f143816f87d4d943b6fe9216e60160d4840ab

Contents?: true

Size: 1.73 KB

Versions: 2

Compression:

Stored size: 1.73 KB

Contents

#
# Copyright 2018- joker1007
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "arrow"
require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/arrow_memory_chunk'
require 'fluent/plugin/arrow/field_wrapper'

module Fluent
  module Plugin
    class ArrowMemoryBuffer < Fluent::Plugin::Buffer
      Plugin.register_buffer('arrow_memory', self)

      config_param :schema, :array
      config_param :arrow_format, :enum, list: [:arrow, :parquet], default: :arrow
      config_param :row_group_chunk_size, :integer, default: 1024

      attr_reader :arrow_schema

      def configure(conf)
        super

        # [{"name" => foo1, "type" => "uint64"}, {"name" => foo2, "type" => "struct", "fields" => [{"name" => bar1, "type" => "string"}]}
        @field_wrappers = @schema.each_with_object({}) do |field, h|
          h[field["name"]] = Fluent::Plugin::Arrow::FieldWrapper.build(field)
        end

        @arrow_schema = ::Arrow::Schema.new(@field_wrappers.values.map(&:arrow_field))
      end

      def resume
        return {}, []
      end

      def generate_chunk(metadata)
        Fluent::Plugin::Buffer::ArrowMemoryChunk.new(metadata, @arrow_schema, @field_wrappers, chunk_size: @row_group_chunk_size, format: @arrow_format)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluent-plugin-arrow-0.0.3 lib/fluent/plugin/buf_arrow_memory.rb
fluent-plugin-arrow-0.0.2 lib/fluent/plugin/buf_arrow_memory.rb