require 'helper'
class BigQueryBaseOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
CONFIG = %[
table foo
email foo@bar.example
private_key_path /path/to/key
project yourproject_id
dataset yourdataset_id
time_format %s
time_key time
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"},
{"name": "vhost", "type": "STRING"},
{"name": "path", "type": "STRING"},
{"name": "method", "type": "STRING"},
{"name": "protocol", "type": "STRING"},
{"name": "agent", "type": "STRING"},
{"name": "referer", "type": "STRING"},
{"name": "remote", "type": "RECORD", "fields": [
{"name": "host", "type": "STRING"},
{"name": "ip", "type": "STRING"},
{"name": "user", "type": "STRING"}
]},
{"name": "requesttime", "type": "FLOAT"},
{"name": "bot_access", "type": "BOOLEAN"},
{"name": "loginsession", "type": "BOOLEAN"}
]
]
API_SCOPE = "https://www.googleapis.com/auth/bigquery"
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::BigQueryBaseOutput).configure(conf)
end
def stub_writer(stub_auth: true)
stub.proxy(Fluent::BigQuery::Writer).new.with_any_args do |writer|
stub(writer).get_auth { nil } if stub_auth
yield writer
writer
end
end
private def sudo_schema_response
{
"schema" => {
"fields" => [
{
"name" => "time",
"type" => "TIMESTAMP",
"mode" => "REQUIRED"
},
{
"name" => "tty",
"type" => "STRING",
"mode" => "NULLABLE"
},
{
"name" => "pwd",
"type" => "STRING",
"mode" => "REQUIRED"
},
{
"name" => "user",
"type" => "STRING",
"mode" => "REQUIRED"
},
{
"name" => "argv",
"type" => "STRING",
"mode" => "REPEATED"
}
]
}
}
end
def test_configure_table
driver = create_driver
assert_equal driver.instance.table, 'foo'
assert_nil driver.instance.tables
driver = create_driver(CONFIG.sub(/\btable\s+.*$/, 'tables foo,bar'))
assert_nil driver.instance.table
assert_equal driver.instance.tables, ['foo' ,'bar']
assert_raise(Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid") {
create_driver(CONFIG + "tables foo,bar")
}
end
def test_configure_auth_private_key
driver = create_driver
stub_writer(stub_auth: false) do |writer|
mock(writer).get_auth_from_private_key { stub! }
end
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_compute_engine
driver = create_driver(%[
table foo
auth_method compute_engine
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
stub_writer(stub_auth: false) do |writer|
mock(writer).get_auth_from_compute_engine { stub! }
end
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_json_key_as_file
driver = create_driver(%[
table foo
auth_method json_key
json_key jsonkey.josn
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
stub_writer(stub_auth: false) do |writer|
mock(writer).get_auth_from_json_key { stub! }
end
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_json_key_as_string
json_key = '{"private_key": "X", "client_email": "' + 'x' * 255 + '@developer.gserviceaccount.com"}'
json_key_io = StringIO.new(json_key)
authorization = Object.new
stub(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: satisfy {|arg| JSON.parse(arg.read) == JSON.parse(json_key_io.read) }, scope: API_SCOPE) { authorization }
driver = create_driver(%[
table foo
auth_method json_key
json_key #{json_key}
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
stub_writer(stub_auth: false) do |writer|
mock.proxy(writer).get_auth_from_json_key { stub! }
end
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_application_default
omit "This testcase depends on some environment variables." if ENV["CI"] == "true"
driver = create_driver(%[
table foo
auth_method application_default
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
stub_writer(stub_auth: false) do |writer|
mock.proxy(writer).get_auth_from_application_default { stub! }
end
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_format
now = Fluent::EventTime.new(Time.now.to_i)
input = {
"status" => "1",
"bytes" => 3.0,
"vhost" => :bar,
"path" => "/path/to/baz",
"method" => "GET",
"protocol" => "HTTP/0.9",
"agent" => "libwww",
"referer" => "http://referer.example",
"requesttime" => (now - 1).to_f.to_s,
"bot_access" => true,
"loginsession" => false,
"something-else" => "would be ignored",
"yet-another" => {
"foo" => "bar",
"baz" => 1,
},
"remote" => {
"host" => "remote.example",
"ip" => "192.0.2.1",
"port" => 12345,
"user" => "tagomoris",
}
}
expected = {
"time" => now.to_i,
"status" => 1,
"bytes" => 3,
"vhost" => "bar",
"path" => "/path/to/baz",
"method" => "GET",
"protocol" => "HTTP/0.9",
"agent" => "libwww",
"referer" => "http://referer.example",
"requesttime" => (now - 1).to_f.to_s.to_f,
"bot_access" => true,
"loginsession" => false,
"something-else" => "would be ignored",
"yet-another" => {
"foo" => "bar",
"baz" => 1,
},
"remote" => {
"host" => "remote.example",
"ip" => "192.0.2.1",
"port" => 12345,
"user" => "tagomoris",
}
}
driver = create_driver(CONFIG)
buf = nil
driver.run { buf = driver.instance.format("my.tag", now, input) }
assert_equal expected, MultiJson.load(buf)
end
[
# ,