require_relative '../helper'
require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_copy'
require 'fluent/event'
require 'flexmock/test_unit'
class CopyOutputTest < Test::Unit::TestCase
include FlexMock::TestCase
class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
require 'fluent/plugin/out_test'
require 'fluent/plugin/out_test2'
end
def shutdown
$LOAD_PATH.shift
end
end
def setup
Fluent::Test.setup
end
CONFIG = %[
@type test
name c0
@type test2
name c1
@type test
name c2
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(conf)
end
def test_configure
d = create_driver
outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
assert_equal Fluent::Plugin::TestOutput, outputs[2].class
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
assert_false d.instance.deep_copy
assert_equal :no_copy, d.instance.copy_mode
end
ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
@type test
name c0
@type test
name c1
@type test
name c2
]
def test_configure_with_errorneus_ignore_if_prev_success
assert_raise(Fluent::ConfigError) do
create_driver(ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG)
end
end
ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
@log_level info
@type test
name c0
@type test
name c1
@type test
name c2
]
def test_configure_all_ignore_errors_without_ignore_if_prev_success
d = create_driver(ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG)
expected = /ignore_errors are specified in all , but ignore_if_prev_success is not specified./
matches = d.logs.grep(expected)
assert_equal(1, matches.length, "Logs do not contain '#{expected}' '#{d.logs}'")
end
def test_configure_with_deep_copy_and_use_shallow_copy_mode
d = create_driver(%[
deep_copy true
@type test
name c0
])
outputs = d.instance.outputs
assert_equal 1, outputs.size
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal "c0", outputs[0].name
assert_true d.instance.deep_copy
assert_equal :shallow, d.instance.copy_mode
end
def test_feed_events
d = create_driver
assert !d.instance.outputs[0].has_router?
assert_not_nil d.instance.outputs[1].router
assert !d.instance.outputs[2].has_router?
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test') do
d.feed(time, {"a" => 1})
d.feed(time, {"a" => 2})
end
d.instance.outputs.each {|o|
assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end
def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ])
es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream)
d.run(default_tag: 'test') do
d.feed(es)
end
d.instance.outputs.each { |o|
assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end
def create_event_test_driver(copy_mode = 'no_copy')
config = %[
copy_mode #{copy_mode}
@type test
name output1
@type test
name output2
]
d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config)
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
es.each do |time, record|
record['foo'] = 'bar'
end
super(tag, es)
end
d
end
time = event_time("2013-05-26 06:37:22 UTC")
gen_multi_es = Proc.new {
es = Fluent::MultiEventStream.new
es.add(time, {"a" => 1, "nest" => {'k' => 'v'}})
es.add(time, {"b" => 1, "nest" => {'k' => 'v'}})
es
}
data(
"OneEventStream without copy" => ['no_copy', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"OneEventStream with shallow" => ['shallow', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"OneEventStream with marshal" => ['marshal', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"OneEventStream with deep" => ['deep', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"ArrayEventStream without copy" => ['no_copy', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"ArrayEventStream with shallow" => ['shallow', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"ArrayEventStream with marshal" => ['marshal', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"ArrayEventStream with deep" => ['deep', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"MultiEventStream without copy" => ['no_copy', gen_multi_es.call],
"MultiEventStream with shallow" => ['shallow', gen_multi_es.call],
"MultiEventStream with marshal" => ['marshal', gen_multi_es.call],
"MultiEventStream with deep" => ['deep', gen_multi_es.call],
)
def test_copy_mode_with_event_streams(data)
copy_mode, es = data
d = create_event_test_driver(copy_mode)
d.run(default_tag: 'test') do
d.feed(es)
end
events = d.instance.outputs.map(&:events)
if copy_mode != 'no_copy'
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last
assert_not_equal record0.object_id, record1.object_id
assert_equal "bar", record0["foo"]
assert !record1.has_key?("foo")
if copy_mode == 'shallow'
assert_equal record0['nest'].object_id, record1['nest'].object_id
else
assert_not_equal record0['nest'].object_id, record1['nest'].object_id
end
end
else
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last
assert_equal record0.object_id, record1.object_id
assert_equal "bar", record0["foo"]
assert_equal "bar", record1["foo"]
assert_equal record0['nest'].object_id, record1['nest'].object_id
end
end
end
IGNORE_ERROR_CONFIG = %[
@type test
name c0
@type test
name c1
@type test
name c2
]
def test_ignore_error
d = create_driver(IGNORE_ERROR_CONFIG)
# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end
IGNORE_IF_PREV_SUCCESS_CONFIG = %[
@type test
name c0
@type test
name c1
@type test
name c2
]
def test_ignore_if_prev_success
d = create_driver(IGNORE_IF_PREV_SUCCESS_CONFIG)
# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end
# check ingore_if_prev_success functionality:
# 1. output 2 is succeeded.
# 2. output 3 is not called.
flexstub(d.instance.outputs[1]) do |output|
output.should_receive(:process).once
end
flexstub(d.instance.outputs[2]) do |output|
output.should_receive(:process).never
end
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end
end