require_relative 'helper'
require 'fluent/event_router'
require 'fluent/system_config'
require 'timecop'
require_relative 'test_plugin_classes'
class RootAgentTest < ::Test::Unit::TestCase
include Fluent
include FluentTest
def test_initialize
ra = RootAgent.new(log: $log)
assert_equal 0, ra.instance_variable_get(:@suppress_emit_error_log_interval)
assert_nil ra.instance_variable_get(:@next_emit_error_log_time)
end
data(
'suppress interval' => [{'emit_error_log_interval' => 30}, {:@suppress_emit_error_log_interval => 30}],
'without source' => [{'without_source' => true}, {:@without_source => true}],
'enable input metrics' => [{'enable_input_metrics' => true}, {:@enable_input_metrics => true}],
)
def test_initialize_with_opt(data)
opt, expected = data
ra = RootAgent.new(log: $log, system_config: SystemConfig.new(opt))
expected.each { |k, v|
assert_equal v, ra.instance_variable_get(k)
}
end
sub_test_case 'configure' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
end
def configure_ra(conf_str)
conf = Config.parse(conf_str, "(test)", "(test_dir)", true)
@ra.configure(conf)
@ra
end
test 'empty' do
ra = configure_ra('')
assert_empty ra.inputs
assert_empty ra.labels
assert_empty ra.outputs
assert_empty ra.filters
assert_nil ra.context
assert_nil ra.error_collector
end
test 'raises configuration error for missing type of source' do
conf = <<-EOC
EOC
errmsg = "Missing '@type' parameter on directive"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'raises configuration error for missing type of match' do
conf = <<-EOC
@type test_in
EOC
errmsg = "Missing '@type' parameter on directive"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'raises configuration error for missing type of filter' do
conf = <<-EOC
@type test_in
@type test_out
EOC
errmsg = "Missing '@type' parameter on directive"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'raises configuration error if there are two same label section' do
conf = <<-EOC
@type test_in
@label @test
@type test_out
@type test_out
EOC
errmsg = "Section appears twice"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'raises configuration error for label without name' do
conf = <<-EOC
@type test_out
EOC
errmsg = "Missing symbol argument on directive"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'raises configuration error for ' do
conf = <<-EOC
@type test_in
@label @ROOT
@type test_out
EOC
errmsg = "@ROOT for is not permitted, reserved for getting root router"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'raises configuration error if there are not match sections in label section' do
conf = <<-EOC
@type test_in
@label @test
@type test_out
EOC
errmsg = "Missing sections in section"
assert_raise Fluent::ConfigError.new(errmsg) do
configure_ra(conf)
end
end
test 'with plugins' do
# check @type and type in one configuration
conf = <<-EOC
@type test_in
@id test_in
type test_filter
id test_filter
@type relabel
@id test_relabel
@label @test
type test_out
id test_out
@type null
EOC
ra = configure_ra(conf)
assert_kind_of FluentTestInput, ra.inputs.first
assert_kind_of Plugin::RelabelOutput, ra.outputs.first
assert_kind_of FluentTestFilter, ra.filters.first
assert ra.error_collector
%W(@test @ERROR).each { |label_symbol|
assert_include ra.labels, label_symbol
assert_kind_of Label, ra.labels[label_symbol]
}
test_label = ra.labels['@test']
assert_kind_of FluentTestOutput, test_label.outputs.first
assert_equal ra, test_label.root_agent
error_label = ra.labels['@ERROR']
assert_kind_of Fluent::Plugin::NullOutput, error_label.outputs.first
end
end
sub_test_case 'start/shutdown' do
def setup_root_agent(conf)
ra = RootAgent.new(log: $log)
stub(Engine).root_agent { ra }
ra.configure(Config.parse(conf, "(test)", "(test_dir)", true))
ra
end
test 'plugin status' do
ra = setup_root_agent(<<-EOC)
@type test_in
@id test_in
type test_filter
id test_filter
@type test_out
@id test_out
EOC
ra.start
assert_true ra.inputs.first.started
assert_true ra.filters.first.started
assert_true ra.outputs.first.started
ra.shutdown
assert_false ra.inputs.first.started
assert_false ra.filters.first.started
assert_false ra.outputs.first.started
end
test 'output plugin threads should run before input plugin is blocked with buffer full' do
ra = setup_root_agent(<<-EOC)
@type test_in_gen
@id test_in_gen
@type test_out_buffered
@id test_out_buffered
chunk_limit_size 1k
queue_limit_length 2
flush_thread_count 2
overflow_action block
EOC
waiting(5) { ra.start }
assert_true ra.inputs.first.started
assert_true ra.outputs.first.started
ra.shutdown
assert_false ra.inputs.first.started
assert_false ra.outputs.first.started
end
end
sub_test_case 'configured with label and secondary plugin' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent{ @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
@type test_in
@label @route_a
@type test_out_buffered
@type test_out_emit
@type test_out
EOC
end
test 'secondary plugin has an event router for the label which the plugin is in' do
assert_equal 1, @ra.inputs.size
assert_equal 2, @ra.labels.size
assert_equal ['@route_a', '@route_b'], @ra.labels.keys
assert_equal '@route_a', @ra.labels['@route_a'].context
assert_equal '@route_b', @ra.labels['@route_b'].context
c1 = @ra.labels['@route_a']
assert_equal 1, c1.outputs.size
assert !c1.outputs.first.has_router?
assert c1.outputs.first.secondary
assert c1.outputs.first.secondary.has_router?
assert_equal c1.event_router, c1.outputs.first.secondary.router
end
end
sub_test_case 'configured with label and secondary plugin with @label specifier' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent{ @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
@type test_in
@label @route_a
@type test_out_buffered
@type test_out_emit
@label @route_b
@type test_out
EOC
end
test 'secondary plugin has an event router for the label specified in secondary section' do
assert_equal 1, @ra.inputs.size
assert_equal 2, @ra.labels.size
assert_equal ['@route_a', '@route_b'], @ra.labels.keys
assert_equal '@route_a', @ra.labels['@route_a'].context
assert_equal '@route_b', @ra.labels['@route_b'].context
c1 = @ra.labels['@route_a']
c2 = @ra.labels['@route_b']
assert_equal 1, c1.outputs.size
assert !c1.outputs.first.has_router?
assert c1.outputs.first.secondary
assert c1.outputs.first.secondary.has_router?
assert_equal c2.event_router, c1.outputs.first.secondary.router
end
end
sub_test_case 'configured with label and secondary plugin with @label specifier in primary output' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent{ @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
@type test_in
@label @route_a
@type test_out_emit
@label @route_b
@type test_out_emit
@type test_out
EOC
end
test 'secondary plugin has an event router for the label specified in secondary section' do
assert_equal 1, @ra.inputs.size
assert_equal 2, @ra.labels.size
assert_equal ['@route_a', '@route_b'], @ra.labels.keys
assert_equal '@route_a', @ra.labels['@route_a'].context
assert_equal '@route_b', @ra.labels['@route_b'].context
c1 = @ra.labels['@route_a']
c2 = @ra.labels['@route_b']
assert_equal 1, c1.outputs.size
assert c1.outputs.first.secondary
p1 = c1.outputs.first
assert p1.has_router?
assert_equal c1.event_router, p1.context_router
assert_equal c2.event_router, p1.router
s1 = p1.secondary
assert s1.has_router?
assert_equal c1.event_router, s1.context_router
assert_equal c2.event_router, s1.router
end
end
sub_test_case 'configured with MultiOutput plugins' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
@type test_in
@id test_in
@type test_filter
@id test_filter
@type copy
@id test_copy
@type test_out
@id test_out1
@type test_out
@id test_out2
EOC
@ra
end
test 'plugin status with multi output' do
assert_equal 1, @ra.inputs.size
assert_equal 1, @ra.filters.size
assert_equal 3, @ra.outputs.size
@ra.start
assert_equal [true], @ra.inputs.map{|i| i.started? }
assert_equal [true], @ra.filters.map{|i| i.started? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.started? }
assert_equal [true], @ra.inputs.map{|i| i.after_started? }
assert_equal [true], @ra.filters.map{|i| i.after_started? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.after_started? }
@ra.shutdown
assert_equal [true], @ra.inputs.map{|i| i.stopped? }
assert_equal [true], @ra.filters.map{|i| i.stopped? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.stopped? }
assert_equal [true], @ra.inputs.map{|i| i.before_shutdown? }
assert_equal [true], @ra.filters.map{|i| i.before_shutdown? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.before_shutdown? }
assert_equal [true], @ra.inputs.map{|i| i.shutdown? }
assert_equal [true], @ra.filters.map{|i| i.shutdown? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.shutdown? }
assert_equal [true], @ra.inputs.map{|i| i.after_shutdown? }
assert_equal [true], @ra.filters.map{|i| i.after_shutdown? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.after_shutdown? }
assert_equal [true], @ra.inputs.map{|i| i.closed? }
assert_equal [true], @ra.filters.map{|i| i.closed? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.closed? }
assert_equal [true], @ra.inputs.map{|i| i.terminated? }
assert_equal [true], @ra.filters.map{|i| i.terminated? }
assert_equal [true, true, true], @ra.outputs.map{|i| i.terminated? }
end
end
sub_test_case 'configured with MultiOutput plugins and labels' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
@type test_in
@id test_in
@label @testing
@type test_filter
@id test_filter
@type copy
@id test_copy
@type test_out
@id test_out1
@type test_out
@id test_out2
EOC
@ra
end
test 'plugin status with multi output' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 1, @ra.labels.values.first.filters.size
assert_equal 3, @ra.labels.values.first.outputs.size
label_filters = @ra.labels.values.first.filters
label_outputs = @ra.labels.values.first.outputs
@ra.start
assert_equal [true], @ra.inputs.map{|i| i.started? }
assert_equal [true], label_filters.map{|i| i.started? }
assert_equal [true, true, true], label_outputs.map{|i| i.started? }
@ra.shutdown
assert_equal [true], @ra.inputs.map{|i| i.stopped? }
assert_equal [true], label_filters.map{|i| i.stopped? }
assert_equal [true, true, true], label_outputs.map{|i| i.stopped? }
assert_equal [true], @ra.inputs.map{|i| i.before_shutdown? }
assert_equal [true], label_filters.map{|i| i.before_shutdown? }
assert_equal [true, true, true], label_outputs.map{|i| i.before_shutdown? }
assert_equal [true], @ra.inputs.map{|i| i.shutdown? }
assert_equal [true], label_filters.map{|i| i.shutdown? }
assert_equal [true, true, true], label_outputs.map{|i| i.shutdown? }
assert_equal [true], @ra.inputs.map{|i| i.after_shutdown? }
assert_equal [true], label_filters.map{|i| i.after_shutdown? }
assert_equal [true, true, true], label_outputs.map{|i| i.after_shutdown? }
assert_equal [true], @ra.inputs.map{|i| i.closed? }
assert_equal [true], label_filters.map{|i| i.closed? }
assert_equal [true, true, true], label_outputs.map{|i| i.closed? }
assert_equal [true], @ra.inputs.map{|i| i.terminated? }
assert_equal [true], label_filters.map{|i| i.terminated? }
assert_equal [true, true, true], label_outputs.map{|i| i.terminated? }
end
test 'plugin #shutdown is not called twice' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 1, @ra.labels.values.first.filters.size
assert_equal 3, @ra.labels.values.first.outputs.size
@ra.start
old_level = @ra.log.level
begin
@ra.log.instance_variable_get(:@logger).level = Fluent::Log::LEVEL_INFO - 1
assert_equal Fluent::Log::LEVEL_INFO, @ra.log.level
@ra.log.out.flush_logs = false
@ra.shutdown
test_out1_shutdown_logs = @ra.log.out.logs.select{|line| line =~ /shutting down output plugin type=:test_out plugin_id="test_out1"/ }
assert_equal 1, test_out1_shutdown_logs.size
ensure
@ra.log.out.flush_logs = true
@ra.log.out.reset
@ra.log.level = old_level
end
end
end
sub_test_case 'configured with MultiOutput plugin which creates plugin instances dynamically' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
@type test_in
@id test_in
@label @testing
@type test_dynamic_out
@id test_dyn
EOC
@ra
end
test 'plugin status with multi output' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 0, @ra.labels.values.first.filters.size
assert_equal 1, @ra.labels.values.first.outputs.size
dyn_out = @ra.labels.values.first.outputs.first
assert_nil dyn_out.child
@ra.start
assert_equal 1, @ra.labels.values.first.outputs.size
assert dyn_out.child
assert_false dyn_out.child.outputs_statically_created
assert_equal 2, dyn_out.child.outputs.size
assert_equal true, dyn_out.child.outputs[0].started?
assert_equal true, dyn_out.child.outputs[1].started?
assert_equal true, dyn_out.child.outputs[0].after_started?
assert_equal true, dyn_out.child.outputs[1].after_started?
@ra.shutdown
assert_equal 1, @ra.labels.values.first.outputs.size
assert_false dyn_out.child.outputs_statically_created
assert_equal 2, dyn_out.child.outputs.size
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.stopped? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.before_shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.after_shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.closed? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.terminated? }
end
end
sub_test_case 'configure emit_error_interval' do
setup do
system_config = SystemConfig.new
system_config.emit_error_log_interval = 30
@ra = RootAgent.new(log: $log, system_config: system_config)
stub(Engine).root_agent { @ra }
@ra.log.out.reset
one_minute_ago = Time.now.to_i - 60
Timecop.freeze(one_minute_ago)
end
teardown do
Timecop.return
end
test 'suppresses errors' do
mock(@ra.log).warn_backtrace()
e = StandardError.new('standard error')
begin
@ra.handle_emits_error("tag", nil, e)
rescue
end
begin
@ra.handle_emits_error("tag", nil, e)
rescue
end
assert_equal 1, @ra.log.out.logs.size
end
end
sub_test_case 'configured at worker2 with 4 workers environment' do
setup do
ENV['SERVERENGINE_WORKER_ID'] = '2'
@ra = RootAgent.new(log: $log)
system_config = SystemConfig.new
system_config.workers = 4
stub(Engine).worker_id { 2 }
stub(Engine).root_agent { @ra }
stub(Engine).system_config { system_config }
@ra
end
teardown '' do
ENV.delete('SERVERENGINE_WORKER_ID')
end
def configure_ra(conf_str)
conf = Config.parse(conf_str, "(test)", "(test_dir)", true)
@ra.configure(conf)
@ra
end
test 'raises configuration error for missing worker id' do
errmsg = 'Missing worker id on directive'
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error for too big worker id' do
errmsg = "worker id 4 specified by directive is not allowed. Available worker id is between 0 and 3"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error for too big worker id on multi workers syntax' do
errmsg = "worker id 4 specified by directive is not allowed. Available worker id is between 0 and 3"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error for worker id collisions on multi workers syntax' do
errmsg = "specified worker_id<2> collisions is detected on directive. Available worker id(s): [3]"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error for worker id collisions on multi workers syntax when multi available worker_ids are left' do
errmsg = "specified worker_id<1> collisions is detected on directive. Available worker id(s): [2, 3]"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do
errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by directive is not allowed. Available multi worker assign syntax is -"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error for invalid elements as a child of worker section' do
errmsg = ' section cannot have directive'
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
EOC
configure_ra(conf)
end
end
test 'raises configuration error when configured plugins do not have support multi worker configuration' do
errmsg = "Plugin 'test_out' does not support multi workers configuration (FluentTest::FluentTestOutput)"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
@type test_out
EOC
configure_ra(conf)
end
end
test 'does not raise configuration error when configured plugins in worker section do not have support multi worker configuration' do
assert_nothing_raised do
conf = <<-EOC
@type test_out
EOC
configure_ra(conf)
end
end
test 'does not raise configuration error when configured plugins as a children of MultiOutput in worker section do not have support multi worker configuration' do
assert_nothing_raised do
conf = <<-EOC
@type copy
@type test_out
@type test_out
EOC
configure_ra(conf)
end
end
test 'does not raise configuration error when configured plugins owned by plugin do not have support multi worker configuration' do
assert_nothing_raised do
conf = <<-EOC
@type test_out_buffered
@type test_buffer
EOC
configure_ra(conf)
end
end
test 'with plugins' do
conf = <<-EOC
@type test_in
@id test_in
type test_filter
id test_filter
@type relabel
@id test_relabel
@label @test
type test_out
id test_out
@type null
EOC
ra = configure_ra(conf)
assert_kind_of FluentTestInput, ra.inputs.first
assert_kind_of Plugin::RelabelOutput, ra.outputs.first
assert_kind_of FluentTestFilter, ra.filters.first
assert ra.error_collector
%W(@test @ERROR).each { |label_symbol|
assert_include ra.labels, label_symbol
assert_kind_of Label, ra.labels[label_symbol]
}
test_label = ra.labels['@test']
assert_kind_of FluentTestOutput, test_label.outputs.first
assert_equal ra, test_label.root_agent
error_label = ra.labels['@ERROR']
assert_kind_of Fluent::Plugin::NullOutput, error_label.outputs.first
end
test 'with plugins but for another worker' do
conf = <<-EOC
@type test_in
@id test_in
type test_filter
id test_filter
@type relabel
@id test_relabel
@label @test
type test_out
id test_out
@type null
EOC
ra = configure_ra(conf)
assert_equal 0, ra.inputs.size
assert_equal 0, ra.outputs.size
assert_equal 0, ra.filters.size
assert_equal 0, ra.labels.size
refute ra.error_collector
end
test 'with plugins for workers syntax should match worker_id equals to 2' do
conf = <<-EOC
@type forward
@type test_filter
@id test_filter
@type stdout
@type null
EOC
ra = configure_ra(conf)
assert_kind_of Fluent::Plugin::ForwardInput, ra.inputs.first
assert_kind_of Fluent::Plugin::StdoutOutput, ra.outputs.first
assert_kind_of FluentTestFilter, ra.filters.first
assert ra.error_collector
end
end
end