require "helper"
require "fluent/test/driver/output"
class SqlOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
def teardown
end
CONFIG = %[
host localhost
port 5432
adapter postgresql
database fluentd_test
username fluentd
password fluentd
schema_search_path public
remove_tag_prefix db
table logs
column_mapping timestamp:created_at,host:host,ident:ident,pid:pid,message:message
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::SQLOutput).configure(conf)
end
def test_configure
d = create_driver
expected = {
host: "localhost",
port: 5432,
adapter: "postgresql",
database: "fluentd_test",
username: "fluentd",
password: "fluentd",
schema_search_path: 'public',
remove_tag_suffix: /^db/,
enable_fallback: true,
pool: 5
}
actual = {
host: d.instance.host,
port: d.instance.port,
adapter: d.instance.adapter,
database: d.instance.database,
username: d.instance.username,
password: d.instance.password,
schema_search_path: d.instance.schema_search_path,
remove_tag_suffix: d.instance.remove_tag_prefix,
enable_fallback: d.instance.enable_fallback,
pool: d.instance.pool
}
assert_equal(expected, actual)
assert_empty(d.instance.tables)
default_table = d.instance.instance_variable_get(:@default_table)
assert_equal("logs", default_table.table)
end
def test_emit
d = create_driver
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.run(default_tag: 'test') do
d.feed(time, {"message" => "message1"})
d.feed(time, {"message" => "message2"})
end
default_table = d.instance.instance_variable_get(:@default_table)
model = default_table.instance_variable_get(:@model)
assert_equal(2, model.all.count)
messages = model.pluck(:message).sort
assert_equal(["message1", "message2"], messages)
end
class Fallback < self
def test_simple
d = create_driver
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.run(default_tag: 'test') do
d.feed(time, {"message" => "message1"})
d.feed(time, {"message" => "message2"})
default_table = d.instance.instance_variable_get(:@default_table)
model = default_table.instance_variable_get(:@model)
mock(model).import(anything).at_least(1) do
raise ActiveRecord::Import::MissingColumnError.new("dummy_table", "dummy_column")
end
mock(default_table).one_by_one_import(anything)
end
end
def test_limit
d = create_driver
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.run(default_tag: 'test') do
d.feed(time, {"message" => "message1"})
d.feed(time, {"message" => "message2"})
default_table = d.instance.instance_variable_get(:@default_table)
model = default_table.instance_variable_get(:@model)
mock(model).import([anything, anything]).once do
raise ActiveRecord::Import::MissingColumnError.new("dummy_table", "dummy_column")
end
mock(model).import([anything]).times(12) do
raise StandardError
end
assert_equal(5, default_table.instance_variable_get(:@num_retries))
end
end
end
end