require_relative '../helper'
require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_copy'
require 'fluent/event'
class CopyOutputTest < Test::Unit::TestCase
class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
require 'fluent/plugin/out_test'
require 'fluent/plugin/out_test2'
end
def shutdown
$LOAD_PATH.shift
end
end
def setup
Fluent::Test.setup
end
CONFIG = %[
@type test
name c0
@type test2
name c1
@type test
name c2
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(conf)
end
def test_configure
d = create_driver
outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
assert_equal Fluent::Plugin::TestOutput, outputs[2].class
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
end
def test_feed_events
d = create_driver
assert !d.instance.outputs[0].has_router?
assert_not_nil d.instance.outputs[1].router
assert !d.instance.outputs[2].has_router?
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test') do
d.feed(time, {"a" => 1})
d.feed(time, {"a" => 2})
end
d.instance.outputs.each {|o|
assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end
def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ])
es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream)
d.run(default_tag: 'test') do
d.feed(es)
end
d.instance.outputs.each { |o|
assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end
def create_event_test_driver(does_deep_copy = false)
config = %[
deep_copy #{does_deep_copy}
@type test
name output1
@type test
name output2
]
d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config)
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
es.each do |time, record|
record['foo'] = 'bar'
end
super(tag, es)
end
d
end
time = event_time("2013-05-26 06:37:22 UTC")
mes0 = Fluent::MultiEventStream.new
mes0.add(time, {"a" => 1})
mes0.add(time, {"b" => 1})
mes1 = Fluent::MultiEventStream.new
mes1.add(time, {"a" => 1})
mes1.add(time, {"b" => 1})
data(
"OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})],
"OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})],
"ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
"ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
"MultiEventStream without deep_copy" => [false, mes0],
"MultiEventStream with deep_copy" => [true, mes1],
)
def test_deep_copy_controls_shallow_or_deep_copied(data)
does_deep_copy, es = data
d = create_event_test_driver(does_deep_copy)
d.run(default_tag: 'test') do
d.feed(es)
end
events = d.instance.outputs.map(&:events)
if does_deep_copy
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last
assert{ record0.object_id != record1.object_id }
assert_equal "bar", record0["foo"]
assert !record1.has_key?("foo")
end
else
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last
assert{ record0.object_id == record1.object_id }
assert_equal "bar", record0["foo"]
assert_equal "bar", record1["foo"]
end
end
end
IGNORE_ERROR_CONFIG = %[
@type test
name c0
@type test
name c1
@type test
name c2
]
def test_ignore_error
d = create_driver(IGNORE_ERROR_CONFIG)
# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end
end