lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.1 vs lib/fluent/plugin/buffer/arrow_memory_chunk.rb in fluent-plugin-arrow-0.0.2
- old
+ new
@@ -13,26 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
require 'arrow'
require 'parquet'
+require 'fluent/msgpack_factory'
require 'fluent/plugin/buffer/chunk'
require 'fluent/plugin/buffer/memory_chunk'
+require 'fluent/plugin/arrow/field_wrapper'
module Fluent
module Plugin
class Buffer
class ArrowMemoryChunk < MemoryChunk
- def initialize(metadata, schema, chunk_size: 1024, format: :arrow)
+ def initialize(metadata, schema, field_wrappers, chunk_size: 1024, format: :arrow)
super(metadata, compress: :text)
@schema = schema
+ @field_wrappers = field_wrappers
@chunk_size = chunk_size
@format = format
- @array_builders = {}
- @schema.fields.each do |f|
- @array_builders[f.name] = field_to_array_builder(f)
- end
@unpacker = Fluent::MessagePackFactory.engine_factory.unpacker
end
def read(**kwargs)
build_arrow_buffer_string
@@ -47,51 +46,31 @@
io.write build_arrow_buffer_string
end
private
- def field_to_array_builder(f)
- data_type_str = f.data_type.to_s
- if data_type_str =~ /timestamp/
- return Arrow::TimestampArrayBuilder.new(f.data_type)
- end
-
- data_type_name = data_type_str.capitalize.gsub(/\AUint/, "UInt")
- array_builder_class_name = "#{data_type_name}ArrayBuilder"
- array_builder_class = Arrow.const_get(array_builder_class_name)
- if array_builder_class.method(:new).arity > 0
- array_builder_class.new(f.data_type)
- else
- array_builder_class.new
- end
- end
-
def build_arrow_buffer_string
count = 0
@unpacker.feed_each(@chunk) do |record|
count += 1
record.each do |k, v|
- if v.nil?
- @array_builders[k].append_null
- else
- @array_builders[k].append(v)
- end
+ @field_wrappers[k].append(v)
end
end
- arrow_buf = Arrow::ResizableBuffer.new(@chunk_bytes * 1.2)
+ arrow_buf = ::Arrow::ResizableBuffer.new(@chunk_bytes * 1.2)
- Arrow::BufferOutputStream.open(arrow_buf) do |output|
+ ::Arrow::BufferOutputStream.open(arrow_buf) do |output|
if @format == :parquet
Parquet::ArrowFileWriter.open(@schema, output) do |writer|
columns = @schema.fields.map do |f|
- Arrow::Column.new(f, @array_builders[f.name].finish)
+ ::Arrow::Column.new(f, @field_wrappers[f.name].finish)
end
- table = Arrow::Table.new(@schema, columns)
+ table = ::Arrow::Table.new(@schema, columns)
writer.write_table(table, @chunk_size)
end
else
- Arrow::RecordBatchFileWriter.open(output, @schema) do |writer|
- record_batch = Arrow::RecordBatch.new(@schema, count, @array_builders.values.map(&:finish))
+ ::Arrow::RecordBatchFileWriter.open(output, @schema) do |writer|
+ record_batch = ::Arrow::RecordBatch.new(@schema, count, @field_wrappers.values.map(&:finish))
writer.write_record_batch(record_batch)
end
end
end