require 'helper'
class BigQueryOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
CONFIG = %[
table foo
email foo@bar.example
private_key_path /path/to/key
project yourproject_id
dataset yourdataset_id
time_format %s
time_key time
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"},
{"name": "vhost", "type": "STRING"},
{"name": "path", "type": "STRING"},
{"name": "method", "type": "STRING"},
{"name": "protocol", "type": "STRING"},
{"name": "agent", "type": "STRING"},
{"name": "referer", "type": "STRING"},
{"name": "remote", "type": "RECORD", "fields": [
{"name": "host", "type": "STRING"},
{"name": "ip", "type": "STRING"},
{"name": "user", "type": "STRING"}
]},
{"name": "requesttime", "type": "FLOAT"},
{"name": "bot_access", "type": "BOOLEAN"},
{"name": "loginsession", "type": "BOOLEAN"}
]
]
API_SCOPE = "https://www.googleapis.com/auth/bigquery"
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::BigQueryOutput).configure(conf)
end
def stub_writer(driver)
writer = driver.instance.writer
stub(writer).get_auth { nil }
writer
end
def test_configure_table
driver = create_driver
assert_equal driver.instance.table, 'foo'
assert_nil driver.instance.tables
driver = create_driver(CONFIG.sub(/\btable\s+.*$/, 'tables foo,bar'))
assert_nil driver.instance.table
assert_equal driver.instance.tables, ['foo' ,'bar']
assert_raise(Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid") {
create_driver(CONFIG + "tables foo,bar")
}
end
def test_configure_auth_private_key
key = stub!
mock(Google::APIClient::KeyUtils).load_from_pkcs12('/path/to/key', 'notasecret') { key }
authorization = Object.new
stub(Signet::OAuth2::Client).new
mock(Signet::OAuth2::Client).new(
token_credential_uri: "https://accounts.google.com/o/oauth2/token",
audience: "https://accounts.google.com/o/oauth2/token",
scope: API_SCOPE,
issuer: 'foo@bar.example',
signing_key: key) { authorization }
mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl|
mock(cl).__send__(:authorization=, authorization) {}
cl
end
driver = create_driver
mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash))
driver.instance.writer
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_compute_engine
authorization = Object.new
mock(Google::Auth::GCECredentials).new { authorization }
mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl|
mock(cl).__send__(:authorization=, authorization) {}
cl
end
driver = create_driver(%[
table foo
auth_method compute_engine
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash))
driver.instance.writer
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_json_key_as_file
json_key_path = 'test/plugin/testdata/json_key.json'
authorization = Object.new
mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: File.open(json_key_path), scope: API_SCOPE) { authorization }
mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl|
mock(cl).__send__(:authorization=, authorization) {}
cl
end
driver = create_driver(%[
table foo
auth_method json_key
json_key #{json_key_path}
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash))
driver.instance.writer
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_json_key_as_file_raise_permission_error
json_key_path = 'test/plugin/testdata/json_key.json'
json_key_path_dir = File.dirname(json_key_path)
begin
File.chmod(0000, json_key_path_dir)
driver = create_driver(%[
table foo
auth_method json_key
json_key #{json_key_path}
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
assert_raises(Errno::EACCES) do
driver.instance.writer.client
end
ensure
File.chmod(0755, json_key_path_dir)
end
end
def test_configure_auth_json_key_as_string
json_key = '{"private_key": "X", "client_email": "' + 'x' * 255 + '@developer.gserviceaccount.com"}'
json_key_io = StringIO.new(json_key)
authorization = Object.new
mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: satisfy {|arg| JSON.parse(arg.read) == JSON.parse(json_key_io.read) }, scope: API_SCOPE) { authorization }
mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl|
mock(cl).__send__(:authorization=, authorization) {}
cl
end
driver = create_driver(%[
table foo
auth_method json_key
json_key #{json_key}
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash))
driver.instance.writer
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_configure_auth_application_default
authorization = Object.new
mock(Google::Auth).get_application_default([API_SCOPE]) { authorization }
mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl|
mock(cl).__send__(:authorization=, authorization) {}
cl
end
driver = create_driver(%[
table foo
auth_method application_default
project yourproject_id
dataset yourdataset_id
schema [
{"name": "time", "type": "INTEGER"},
{"name": "status", "type": "INTEGER"},
{"name": "bytes", "type": "INTEGER"}
]
])
mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash))
driver.instance.writer
assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService)
end
def test_format
now = Fluent::EventTime.new(Time.now.to_i)
input = {
"status" => "1",
"bytes" => 3.0,
"vhost" => :bar,
"path" => "/path/to/baz",
"method" => "GET",
"protocol" => "HTTP/0.9",
"agent" => "libwww",
"referer" => "http://referer.example",
"requesttime" => (now - 1).to_f.to_s,
"bot_access" => true,
"loginsession" => false,
"something-else" => "would be ignored",
"yet-another" => {
"foo" => "bar",
"baz" => 1,
},
"remote" => {
"host" => "remote.example",
"ip" => "192.0.2.1",
"port" => 12345,
"user" => "tagomoris",
}
}
expected = {
"time" => now.to_i,
"status" => 1,
"bytes" => 3,
"vhost" => "bar",
"path" => "/path/to/baz",
"method" => "GET",
"protocol" => "HTTP/0.9",
"agent" => "libwww",
"referer" => "http://referer.example",
"requesttime" => (now - 1).to_f.to_s.to_f,
"bot_access" => true,
"loginsession" => false,
"something-else" => "would be ignored",
"yet-another" => {
"foo" => "bar",
"baz" => 1,
},
"remote" => {
"host" => "remote.example",
"ip" => "192.0.2.1",
"port" => 12345,
"user" => "tagomoris",
}
}
driver = create_driver(CONFIG)
buf = nil
driver.run { buf = driver.instance.format("my.tag", now, input) }
assert_equal expected, MultiJson.load(buf)
end
[
# ,