require_relative 'helper' require 'fluent/test' require 'fluent/output' require 'fluent/output_chain' require 'fluent/plugin/buffer' require 'timecop' require 'flexmock/test_unit' module FluentOutputTest include Fluent include FlexMock::TestCase class BufferedOutputTest < ::Test::Unit::TestCase include FluentOutputTest class << self def startup $LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), 'scripts')) require 'fluent/plugin/out_test' require 'fluent/plugin/out_test2' end def shutdown $LOAD_PATH.shift end end def setup Fluent::Test.setup end CONFIG = %[] def create_driver(conf=CONFIG) Fluent::Test::BufferedOutputTestDriver.new(Fluent::BufferedOutput) do def write(chunk) chunk.read end end.configure(conf) end def test_configure # default d = create_driver assert_equal 'memory', d.instance.buffer_type assert_equal 60, d.instance.flush_interval assert_equal false, d.instance.disable_retry_limit assert_equal 17, d.instance.retry_limit assert_equal 1.0, d.instance.retry_wait assert_equal nil, d.instance.max_retry_wait assert_equal 1.0, d.instance.retry_wait assert_equal 1, d.instance.num_threads assert_equal 1, d.instance.queued_chunk_flush_interval # max_retry_wait d = create_driver(CONFIG + %[max_retry_wait 4]) assert_equal 4, d.instance.max_retry_wait # disable_retry_limit d = create_driver(CONFIG + %[disable_retry_limit true]) assert_equal true, d.instance.disable_retry_limit #### retry_state cares it # # retry_wait is converted to Float for calc_retry_wait # d = create_driver(CONFIG + %[retry_wait 1s]) # assert_equal Float, d.instance.retry_wait.class end class FormatterInjectTestOutput < Fluent::Output def initialize super @formatter = nil end end def test_start i = FormatterInjectTestOutput.new i.configure(config_element('ROOT', '', {}, [config_element('inject', '', {'hostname_key' => "host"})])) assert_nothing_raised do i.start end end def create_mock_driver(conf=CONFIG) Fluent::Test::BufferedOutputTestDriver.new(Fluent::BufferedOutput) do attr_accessor :submit_flush_threads def start_mock @started = false start # ensure OutputThread to start successfully submit_flush sleep 0.5 while !@started submit_flush sleep 0.5 end end def try_flush @started = true @submit_flush_threads ||= {} @submit_flush_threads[Thread.current] ||= 0 @submit_flush_threads[Thread.current] += 1 end def write(chunk) chunk.read end end.configure(conf) end def test_secondary d = Fluent::Test::BufferedOutputTestDriver.new(Fluent::BufferedOutput) do def write(chunk) chunk.read end end mock(d.instance.log).warn("Use different plugin for secondary. Check the plugin works with primary like secondary_file", { primary: d.instance.class.to_s, secondary: "Fluent::Plugin::Test2Output" }) d.configure(CONFIG + %[ type test2 name c0 ]) assert_not_nil d.instance.instance_variable_get(:@secondary).router end def test_secondary_with_no_warn_log # ObjectBufferedOutput doesn't implement `custom_filter` d = Fluent::Test::BufferedOutputTestDriver.new(Fluent::ObjectBufferedOutput) mock(d.instance.log).warn("Use different plugin for secondary. Check the plugin works with primary like secondary_file", { primary: d.instance.class.to_s, secondary: "Fluent::Plugin::Test2Output" }).never d.configure(CONFIG + %[ type test2 name c0 ]) assert_not_nil d.instance.instance_variable_get(:@secondary).router end test 'BufferQueueLimitError compatibility' do assert_equal Fluent::Plugin::Buffer::BufferOverflowError, Fluent::BufferQueueLimitError end end class ObjectBufferedOutputTest < ::Test::Unit::TestCase include FluentOutputTest def setup Fluent::Test.setup end CONFIG = %[] def create_driver(conf=CONFIG) Fluent::Test::OutputTestDriver.new(Fluent::ObjectBufferedOutput).configure(conf, true) end def test_configure # default d = create_driver assert_equal true, d.instance.time_as_integer end end class TimeSlicedOutputTest < ::Test::Unit::TestCase include FluentOutputTest include FlexMock::TestCase def setup Fluent::Test.setup FileUtils.rm_rf(TMP_DIR) FileUtils.mkdir_p(TMP_DIR) end TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/time_sliced_output") CONFIG = %[ buffer_path #{TMP_DIR}/foo time_slice_format %Y%m%d%H ] class TimeSlicedOutputTestPlugin < Fluent::TimeSlicedOutput attr_reader :written_chunk_keys, :errors_in_write def initialize super @written_chunk_keys = [] @errors_in_write = [] end def configure(conf) super @formatter = Fluent::Plugin.new_formatter('out_file') @formatter.configure(conf) end def format(tag, time, record) @formatter.format(tag, time, record) end def write(chunk) @written_chunk_keys << chunk.key true rescue => e @errors_in_write << e end end def create_driver(conf=CONFIG) Fluent::Test::TimeSlicedOutputTestDriver.new(TimeSlicedOutputTestPlugin).configure(conf, true) end data(:none => '', :utc => "utc", :localtime => 'localtime', :timezone => 'timezone +0000') test 'configure with timezone related parameters' do |param| assert_nothing_raised { create_driver(CONFIG + param) } end sub_test_case "test emit" do setup do @time = Time.parse("2011-01-02 13:14:15 UTC") Timecop.freeze(@time) end teardown do Timecop.return end test "emit with invalid event" do d = create_driver d.instance.start d.instance.after_start assert_raise ArgumentError, "time must be a Fluent::EventTime (or Integer)" do d.instance.emit_events('test', OneEventStream.new('string', 10)) end end test "plugin can get key of chunk in #write" do d = create_driver d.instance.start d.instance.after_start d.instance.emit_events('test', OneEventStream.new(event_time("2016-11-08 17:44:30 +0900"), {"message" => "yay"})) d.instance.force_flush waiting(10) do sleep 0.1 until d.instance.written_chunk_keys.size == 1 end assert_equal [], d.instance.errors_in_write assert_equal ["2016110808"], d.instance.written_chunk_keys # default timezone is UTC end test "check formatted time compatibility with utc. Should Z, not +00:00" do d = create_driver(CONFIG + %[ utc include_time_key ]) time = Time.parse("2016-11-08 12:00:00 UTC").to_i d.emit({"a" => 1}, time) d.expect_format %[2016-11-08T12:00:00Z\ttest\t{"a":1,"time":"2016-11-08T12:00:00Z"}\n] d.run end end end end