Sha256: 8d2126454213f102e42a912b0e394c368253ae529f97bef9bc3c5b0d93d99a75

Contents?: true

Size: 1.67 KB

Versions: 1

Compression:

Stored size: 1.67 KB

Contents

require_relative 'helper'
require 'fluent/test'
require 'fluent/plugin/in_named_pipe'

class NamedPipeInputTest < Test::Unit::TestCase
  TEST_PATH = 'in_named_pipe'

  setup do
    Fluent::Test.setup
  end

  teardown do
    File.unlink(TEST_PATH) rescue nil
  end

  def create_driver(conf)
    Fluent::Test::InputTestDriver.new(Fluent::NamedPipeInput).configure(conf)
  end

  sub_test_case 'configure' do
    test 'required parameters' do
      assert_raise_message("'path' parameter is required") do
        create_driver(%[
          tag foo
        ])
      end

      assert_raise_message("'tag' parameter is required") do
        create_driver(%[
          path #{TEST_PATH}
        ])
      end
    end
  end

  sub_test_case "emit" do
    CONFIG = %[
      path #{TEST_PATH}
      tag named_pipe
      format ltsv
    ]

    test 'read and emit' do
      d = create_driver(CONFIG)
      d.run {
        pipe = ::Fluent::PluginNamedPipe::Fifo.new(TEST_PATH, :w)
        pipe.write "foo:bar\n"
        pipe.flush
      }

      emits = d.emits
      emits.each do |tag, time, record|
        assert_equal("named_pipe", tag)
        assert_equal({"foo"=>"bar\n"}, record)
      end
    end

    
    test 'fragmented emit' do
      d = create_driver(CONFIG)
      d.run {
        pipe = ::Fluent::PluginNamedPipe::Fifo.new(TEST_PATH, :w)
        pipe.write "fo"
        pipe.flush
        sleep 0.2
        pipe.write "o:ba"
        pipe.flush
        sleep 0.2
        pipe.write "r\n"
        pipe.flush
      }

      emits = d.emits
      emits.each do |tag, time, record|
        assert_equal("named_pipe", tag)
        assert_equal({"foo"=>"bar\n"}, record)
      end
    end

    
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-named_pipe-0.2.0 test/test_in_named_pipe.rb