require_relative '../helper'
class TaggedCopyOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
CONFIG = %[
type test
name c0
type test
name c1
type test
name c2
]
def create_driver(conf = CONFIG, tag = 'test')
Fluent::Test::OutputTestDriver.new(Fluent::TaggedCopyOutput, tag).configure(conf)
end
def test_configure
d = create_driver
outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::TestOutput, outputs[0].class
assert_equal Fluent::TestOutput, outputs[1].class
assert_equal Fluent::TestOutput, outputs[2].class
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
end
def test_emit
d = create_driver
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.emit({"a"=>1}, time)
d.emit({"a"=>2}, time)
d.instance.outputs.each {|o|
assert_equal [
[time, {"a"=>1}],
[time, {"a"=>2}],
], o.events
}
end
def test_msgpack_es_emit_bug
d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput)
outputs = %w(p1 p2).map do |pname|
p = Fluent::Plugin.new_output('test')
p.configure('name' => pname)
p.define_singleton_method(:emit) do |tag, es, chain|
es.each do |time, record|
super(tag, [[time, record]], chain)
end
end
p
end
d.instance.instance_eval { @outputs = outputs }
es = if defined?(MessagePack::Packer)
time = Time.parse("2013-05-26 06:37:22 UTC").to_i
packer = MessagePack::Packer.new
packer.pack([time, {"a" => 1}])
packer.pack([time, {"a" => 2}])
Fluent::MessagePackEventStream.new(packer.to_s)
else
events = "#{[time, {"a" => 1}].to_msgpack}#{[time, {"a" => 2}].to_msgpack}"
Fluent::MessagePackEventStream.new(events)
end
d.instance.emit('test', es, Fluent::NullOutputChain.instance)
d.instance.outputs.each { |o|
assert_equal [
[time, {"a"=>1}],
[time, {"a"=>2}],
], o.events
}
end
def create_event_test_driver(is_deep_copy = false)
deep_copy_config = %[
deep_copy true
]
output1 = Fluent::Plugin.new_output('test')
output1.configure('name' => 'output1')
output1.define_singleton_method(:emit) do |tag, es, chain|
es.each do |time, record|
record['foo'] = 'bar'
super(tag, [[time, record]], chain)
end
end
output2 = Fluent::Plugin.new_output('test')
output2.configure('name' => 'output2')
output2.define_singleton_method(:emit) do |tag, es, chain|
es.each do |time, record|
super(tag, [[time, record]], chain)
end
end
outputs = [output1, output2]
d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput)
d = d.configure(deep_copy_config) if is_deep_copy
d.instance.instance_eval { @outputs = outputs }
d
end
def test_one_event
time = Time.parse("2013-05-26 06:37:22 UTC").to_i
d = create_event_test_driver(false)
es = Fluent::OneEventStream.new(time, {"a" => 1})
d.instance.emit('test', es, Fluent::NullOutputChain.instance)
assert_equal [
[[time, {"a"=>1, "foo"=>"bar"}]],
[[time, {"a"=>1, "foo"=>"bar"}]]
], d.instance.outputs.map{ |o| o.events }
d = create_event_test_driver(true)
es = Fluent::OneEventStream.new(time, {"a" => 1})
d.instance.emit('test', es, Fluent::NullOutputChain.instance)
assert_equal [
[[time, {"a"=>1, "foo"=>"bar"}]],
[[time, {"a"=>1}]]
], d.instance.outputs.map{ |o| o.events }
end
def test_multi_event
time = Time.parse("2013-05-26 06:37:22 UTC").to_i
d = create_event_test_driver(false)
es = Fluent::MultiEventStream.new
es.add(time, {"a" => 1})
es.add(time, {"b" => 2})
d.instance.emit('test', es, Fluent::NullOutputChain.instance)
assert_equal [
[[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]],
[[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]]
], d.instance.outputs.map{ |o| o.events }
d = create_event_test_driver(true)
es = Fluent::MultiEventStream.new
es.add(time, {"a" => 1})
es.add(time, {"b" => 2})
d.instance.emit('test', es, Fluent::NullOutputChain.instance)
assert_equal [
[[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]],
[[time, {"a"=>1}], [time, {"b"=>2}]]
], d.instance.outputs.map{ |o| o.events }
end
## Belows are special tests for tagged_copy
def test_tag_emit
config = %[
tag first
type test
name c0
tag second
type test
name c0
]
d = create_driver(config)
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.emit({"a"=>1}, time)
d.emit({"a"=>2}, time)
first = d.instance.outputs.first
assert_equal [
['first', time, {"a"=>1}],
['first', time, {"a"=>2}],
], first.emits
second = d.instance.outputs[1]
assert_equal [
['second', time, {"a"=>1}],
['second', time, {"a"=>2}],
], second.emits
end
def test_add_tag_prefix_emit
config = %[
add_tag_prefix first
type test
name c0
add_tag_prefix second
type test
name c0
]
d = create_driver(config)
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.emit({"a"=>1}, time)
d.emit({"a"=>2}, time)
first = d.instance.outputs.first
assert_equal [
['first.test', time, {"a"=>1}],
['first.test', time, {"a"=>2}],
], first.emits
second = d.instance.outputs[1]
assert_equal [
['second.test', time, {"a"=>1}],
['second.test', time, {"a"=>2}],
], second.emits
end
def test_remove_tag_prefix_emit
config = %[
remove_tag_prefix first
type test
name c0
remove_tag_prefix second
type test
name c0
]
d = create_driver(config)
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.tag = 'first.test'
d.emit({"a"=>1}, time)
d.tag = 'second.test'
d.emit({"a"=>2}, time)
first = d.instance.outputs.first
assert_equal [
['test', time, {"a"=>1}],
['second.test', time, {"a"=>2}],
], first.emits
second = d.instance.outputs[1]
assert_equal [
['first.test', time, {"a"=>1}],
['test', time, {"a"=>2}],
], second.emits
end
end