require_relative '../helper'
require 'timecop'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_record_transformer'
require 'flexmock/test_unit'
class RecordTransformerFilterTest < Test::Unit::TestCase
include Fluent
include FlexMock::TestCase
setup do
Test.setup
@hostname = Socket.gethostname.chomp
@tag = 'test.tag'
@tag_parts = @tag.split('.')
@time = event_time('2010-05-04 03:02:01 UTC')
Timecop.freeze(@time)
end
teardown do
Timecop.return
end
def create_driver(conf = '')
Fluent::Test::Driver::Filter.new(Fluent::Plugin::RecordTransformerFilter).configure(conf)
end
sub_test_case 'configure' do
test 'check default' do
assert_nothing_raised do
create_driver
end
end
test "keep_keys must be specified together with renew_record true" do
assert_raise(Fluent::ConfigError) do
create_driver(%[keep_keys a])
end
end
end
sub_test_case "test options" do
def filter(config, msgs = [''])
d = create_driver(config)
d.run {
msgs.each { |msg|
d.feed(@tag, @time, {'foo' => 'bar', 'message' => msg, 'nest' => {'k1' => 'v1', 'k2' => 'v2'}})
}
}
d.filtered
end
CONFIG = %[
hostname ${hostname}
tag ${tag}
time ${time}
message ${hostname} ${tag_parts[-1]} ${record["message"]}
]
test 'typical usage' do
msgs = ['1', '2']
filtered = filter(CONFIG, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal('bar', r['foo'])
assert_equal(@hostname, r['hostname'])
assert_equal(@tag, r['tag'])
assert_equal(Time.at(@time).localtime.to_s, r['time'])
assert_equal("#{@hostname} #{@tag_parts[-1]} #{msgs[i]}", r['message'])
assert_equal({'k1' => 'v1', 'k2' => 'v2'}, r['nest'])
end
end
test 'remove_keys' do
config = CONFIG + %[remove_keys foo,message]
filtered = filter(config)
filtered.each_with_index do |(_t, r), i|
assert_not_include(r, 'foo')
assert_equal(@hostname, r['hostname'])
assert_equal(@tag, r['tag'])
assert_equal(Time.at(@time).localtime.to_s, r['time'])
assert_not_include(r, 'message')
end
end
test 'remove_keys with nested key' do
config = CONFIG + %[remove_keys $.nest.k1]
filtered = filter(config)
filtered.each_with_index do |(_t, r), i|
assert_not_include(r['nest'], 'k1')
end
end
test 'renew_record' do
config = CONFIG + %[renew_record true]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_not_include(r, 'foo')
assert_equal(@hostname, r['hostname'])
assert_equal(@tag, r['tag'])
assert_equal(Time.at(@time).localtime.to_s, r['time'])
assert_equal("#{@hostname} #{@tag_parts[-1]} #{msgs[i]}", r['message'])
end
end
test 'renew_time_key' do
config = %[renew_time_key message]
times = [ Time.local(2,2,3,4,5,2010,nil,nil,nil,nil), Time.local(3,2,3,4,5,2010,nil,nil,nil,nil) ]
msgs = times.map{|t| t.to_f.to_s }
filtered = filter(config, msgs)
filtered.each_with_index do |(time, _record), i|
assert_equal(times[i].to_i, time)
assert(time.is_a?(Fluent::EventTime))
assert_true(_record.has_key?('message'))
end
end
test 'renew_time_key and remove_keys' do
config = %[
renew_time_key event_time_key
remove_keys event_time_key
auto_typecast true
event_time_key ${record["message"]}
]
times = [Time.local(2, 2, 3, 4, 5, 2010, nil, nil, nil, nil), Time.local(3, 2, 3, 4, 5, 2010, nil, nil, nil, nil)]
msgs = times.map { |t| t.to_f.to_s }
filtered = filter(config, msgs)
filtered.each_with_index do |(time, _record), i|
assert_equal(times[i].to_i, time)
assert(time.is_a?(Fluent::EventTime))
assert_false(_record.has_key?('event_time_key'))
end
end
test 'keep_keys' do
config = %[renew_record true\nkeep_keys foo,message]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal('bar', r['foo'])
assert_equal(msgs[i], r['message'])
end
end
test 'keep_keys that are not present in the original record should not be included in the result record' do
config = %[renew_record true\nkeep_keys foo, bar, baz, message]
msgs = ['1', '2', nil]
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal('bar', r['foo'])
assert_equal(msgs[i], r['message'])
assert_equal(false, r.has_key?('bar'))
assert_equal(false, r.has_key?('baz'))
end
end
test 'enable_ruby' do
config = %[
enable_ruby yes
message ${hostname} ${tag_parts.last} ${"'" + record["message"] + "'"}
]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal("#{@hostname} #{@tag_parts[-1]} '#{msgs[i]}'", r['message'])
end
end
test 'hash_value' do
config = %[
hash_field {"k1":100, "k2":"foobar"}
%]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal({"k1"=>100, "k2"=>"foobar"}, r['hash_field'])
end
end
test 'array_value' do
config = %[
array_field [1, 2, 3]
%]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal([1,2,3], r['array_field'])
end
end
test 'array_hash_mixed' do
config = %[
mixed_field {"hello":[1,2,3], "world":{"foo":"bar"}}
%]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal({"hello"=>[1,2,3], "world"=>{"foo"=>"bar"}}, r['mixed_field'])
end
end
end
sub_test_case 'test placeholders' do
def filter(config, msgs = [''])
d = create_driver(config)
yield d if block_given?
d.run {
records = msgs.map do |msg|
next msg if msg.is_a?(Hash)
{ 'eventType0' => 'bar', 'message' => msg }
end
records.each do |record|
d.feed(@tag, @time, record)
end
}
d.filtered
end
%w[yes no].each do |enable_ruby|
test "hostname with enble_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
message ${hostname}
]
filtered = filter(config)
filtered.each do |t, r|
assert_equal(@hostname, r['message'])
end
end
test "tag with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
message ${tag}
]
filtered = filter(config)
filtered.each do |t, r|
assert_equal(@tag, r['message'])
end
end
test "tag_parts with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
message ${tag_parts[0]} ${tag_parts[-1]}
]
expected = "#{@tag.split('.').first} #{@tag.split('.').last}"
filtered = filter(config)
filtered.each do |t, r|
assert_equal(expected, r['message'])
end
end
test "${tag_prefix[N]} and ${tag_suffix[N]} with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
message ${tag_prefix[1]} ${tag_prefix[-2]} ${tag_suffix[2]} ${tag_suffix[-3]}
]
@tag = 'prefix.test.tag.suffix'
expected = "prefix.test prefix.test.tag tag.suffix test.tag.suffix"
filtered = filter(config)
filtered.each do |t, r|
assert_equal(expected, r['message'])
end
end
test "time with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
message ${time}
]
filtered = filter(config)
filtered.each do |t, r|
if enable_ruby == "yes"
assert_equal(Time.at(@time).localtime, r['message'])
else
assert_equal(Time.at(@time).localtime.to_s, r['message'])
end
end
end
test "record keys with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
remove_keys eventType0
message bar ${record["message"]}
eventtype ${record["eventType0"]}
]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_not_include(r, 'eventType0')
assert_equal("bar", r['eventtype'])
assert_equal("bar #{msgs[i]}", r['message'])
end
end
test "Prevent overwriting reserved keys such as tag with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
new_tag ${tag}
new_record_tag ${record["tag"]}
]
records = [{'tag' => 'tag', 'time' => 'time'}]
filtered = filter(config, records)
filtered.each_with_index do |(_t, r), i|
assert_not_equal('tag', r['new_tag'])
assert_equal(@tag, r['new_tag'])
assert_equal('tag', r['new_record_tag'])
end
end
test "hash values with placeholders with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
hash_field {
"hostname":"${hostname}",
"tag":"${tag}",
"${tag}":100
}
]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal({"hostname" => @hostname, "tag" => @tag, "#{@tag}" => 100}, r['hash_field'])
end
end
test "array values with placeholders with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
array_field ["${hostname}", "${tag}"]
]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal([@hostname, @tag], r['array_field'])
end
end
test "array and hash values with placeholders with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
mixed_field [{"tag":"${tag}"}]
]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal([{"tag" => @tag}], r['mixed_field'])
end
end
test "keys with placeholders with enable_ruby #{enable_ruby}" do
config = %[
enable_ruby #{enable_ruby}
renew_record true
${hostname} hostname
foo.${tag} tag
]
msgs = ['1', '2']
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
assert_equal({@hostname=>'hostname',"foo.#{@tag}"=>'tag'}, r)
end
end
test "disabled typecasting of values with enable_ruby #{enable_ruby}" do
config = %[
auto_typecast false
enable_ruby #{enable_ruby}
single ${record["source"]}
multiple ${record["source"]}${record["source"]}
with_prefix prefix-${record["source"]}
with_suffix ${record["source"]}-suffix
with_quote record["source"][""]
]
msgs = [
{ "source" => "string" },
{ "source" => 123 },
{ "source" => [1, 2] },
{ "source" => {a:1, b:2} },
{ "source" => nil },
]
expected_results = [
{ single: "string",
multiple: "stringstring",
with_prefix: "prefix-string",
with_suffix: "string-suffix",
with_quote: %Q{record["source"][""]} },
{ single: 123.to_s,
multiple: "#{123.to_s}#{123.to_s}",
with_prefix: "prefix-#{123.to_s}",
with_suffix: "#{123.to_s}-suffix",
with_quote: %Q{record["source"][""]} },
{ single: [1, 2].to_s,
multiple: "#{[1, 2].to_s}#{[1, 2].to_s}",
with_prefix: "prefix-#{[1, 2].to_s}",
with_suffix: "#{[1, 2].to_s}-suffix",
with_quote: %Q{record["source"][""]} },
{ single: {a:1, b:2}.to_s,
multiple: "#{{a:1, b:2}.to_s}#{{a:1, b:2}.to_s}",
with_prefix: "prefix-#{{a:1, b:2}.to_s}",
with_suffix: "#{{a:1, b:2}.to_s}-suffix",
with_quote: %Q{record["source"][""]} },
{ single: nil.to_s,
multiple: "#{nil.to_s}#{nil.to_s}",
with_prefix: "prefix-#{nil.to_s}",
with_suffix: "#{nil.to_s}-suffix",
with_quote: %Q{record["source"][""]} },
]
actual_results = []
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
actual_results << {
single: r["single"],
multiple: r["multiple"],
with_prefix: r["with_prefix"],
with_suffix: r["with_suffix"],
with_quote: r["with_quote"],
}
end
assert_equal(expected_results, actual_results)
end
test "enabled typecasting of values with enable_ruby #{enable_ruby}" do
config = %[
auto_typecast yes
enable_ruby #{enable_ruby}
single ${record["source"]}
multiple ${record["source"]}${record["source"]}
with_prefix prefix-${record["source"]}
with_suffix ${record["source"]}-suffix
]
msgs = [
{ "source" => "string" },
{ "source" => 123 },
{ "source" => [1, 2] },
{ "source" => {a:1, b:2} },
{ "source" => nil },
]
expected_results = [
{ single: "string",
multiple: "stringstring",
with_prefix: "prefix-string",
with_suffix: "string-suffix" },
{ single: 123,
multiple: "#{123.to_s}#{123.to_s}",
with_prefix: "prefix-#{123.to_s}",
with_suffix: "#{123.to_s}-suffix" },
{ single: [1, 2],
multiple: "#{[1, 2].to_s}#{[1, 2].to_s}",
with_prefix: "prefix-#{[1, 2].to_s}",
with_suffix: "#{[1, 2].to_s}-suffix" },
{ single: {a:1, b:2},
multiple: "#{{a:1, b:2}.to_s}#{{a:1, b:2}.to_s}",
with_prefix: "prefix-#{{a:1, b:2}.to_s}",
with_suffix: "#{{a:1, b:2}.to_s}-suffix" },
{ single: nil,
multiple: "#{nil.to_s}#{nil.to_s}",
with_prefix: "prefix-#{nil.to_s}",
with_suffix: "#{nil.to_s}-suffix" },
]
actual_results = []
filtered = filter(config, msgs)
filtered.each_with_index do |(_t, r), i|
actual_results << {
single: r["single"],
multiple: r["multiple"],
with_prefix: r["with_prefix"],
with_suffix: r["with_suffix"],
}
end
assert_equal(expected_results, actual_results)
end
end
test 'unknown placeholder (enable_ruby no)' do
config = %[
enable_ruby no
message ${unknown}
]
filter(config) { |d|
mock(d.instance.log).warn("unknown placeholder `${unknown}` found")
}
end
test 'expand fields starting with @ (enable_ruby no)' do
config = %[
enable_ruby no
foo ${record["@timestamp"]}
]
d = create_driver(config)
message = {"@timestamp" => "foo"}
d.run { d.feed(@tag, @time, message) }
filtered = d.filtered
filtered.each do |t, r|
assert_equal(message["@timestamp"], r['foo'])
end
end
test 'auto_typecast placeholder containing {} (enable_ruby yes)' do
config = %[
tag tag
enable_ruby yes
auto_typecast yes
foo ${record.map{|k,v|v}}
]
d = create_driver(config)
message = {"@timestamp" => "foo"}
d.run { d.feed(@tag, @time, message) }
filtered = d.filtered
filtered.each do |t, r|
assert_equal([message["@timestamp"]], r['foo'])
end
end
end # test placeholders
sub_test_case 'test error record' do
test 'invalid record for placeholders' do
d = create_driver(%[
enable_ruby yes
foo ${record["unknown"]["key"]}
])
flexmock(d.instance.router).should_receive(:emit_error_event).
with(String, Fluent::EventTime, Hash, RuntimeError).once
d.run do
d.feed(@tag, Fluent::EventTime.now, {'key' => 'value'})
end
end
end
end