require "helper" require "fluent/msgpack_factory" require "fluent/plugin/buffer/arrow_memory_chunk" class ArrowMemoryChunkTest < Test::Unit::TestCase setup do @fields = [ Arrow::Field.new("key1", :uint64), Arrow::Field.new("key2", :double), Arrow::Field.new("key3", Arrow::TimestampDataType.new(:second)), ] @schema = Arrow::Schema.new(@fields) @c = Fluent::Plugin::Buffer::ArrowMemoryChunk.new(Object.new, @schema) end test "can #read" do d1 = {"key1" => 123, "key2" => 10.1234, "key3" => Fluent::EventTime.from_time(Time.now)} d2 = {"key1" => 124, "key2" => 11.1234, "key3" => Fluent::EventTime.from_time(Time.now)} data = [d1.to_msgpack, d2.to_msgpack] @c.append(data) Arrow::BufferInputStream.open(Arrow::Buffer.new(@c.read)) do |input| reader = Arrow::RecordBatchFileReader.new(input) reader.each do |record_batch| assert { record_batch.n_rows == 2 } assert { record_batch.find_column(@fields[0].name).class == Arrow::UInt64Array } assert { record_batch.find_column(@fields[0].name).values == [123, 124] } end end end test "can #write_to" do d1 = {"key1" => 123, "key2" => 10.1234, "key3" => Fluent::EventTime.from_time(Time.now)} d2 = {"key1" => 124, "key2" => 11.1234, "key3" => Fluent::EventTime.from_time(Time.now)} data = [d1.to_msgpack, d2.to_msgpack] @c.append(data) Tempfile.create do |tf| @c.write_to(tf) tf.flush Arrow::MemoryMappedInputStream.open(tf.path) do |input| reader = Arrow::RecordBatchFileReader.new(input) reader.each_with_index do |record_batch, i| reader.each do |record_batch| assert { record_batch.n_rows == 2 } assert { record_batch.find_column(@fields[0].name).class == Arrow::UInt64Array } assert { record_batch.find_column(@fields[0].name).values == [123, 124] } end end end end end private def create_driver(conf) Fluent::Test::Driver::Formatter.new(Fluent::Plugin::ArrowFormatter).configure(conf) end end