require 'helper' class BigQueryOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_key time schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "path", "type": "STRING"}, {"name": "method", "type": "STRING"}, {"name": "protocol", "type": "STRING"}, {"name": "agent", "type": "STRING"}, {"name": "referer", "type": "STRING"}, {"name": "remote", "type": "RECORD", "fields": [ {"name": "host", "type": "STRING"}, {"name": "ip", "type": "STRING"}, {"name": "user", "type": "STRING"} ]}, {"name": "requesttime", "type": "FLOAT"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] ] API_SCOPE = "https://www.googleapis.com/auth/bigquery" def create_driver(conf = CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::BigQueryOutput).configure(conf) end def stub_writer(driver) writer = driver.instance.writer stub(writer).get_auth { nil } writer end def test_configure_table driver = create_driver assert_equal driver.instance.table, 'foo' assert_nil driver.instance.tables driver = create_driver(CONFIG.sub(/\btable\s+.*$/, 'tables foo,bar')) assert_nil driver.instance.table assert_equal driver.instance.tables, ['foo' ,'bar'] assert_raise(Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid") { create_driver(CONFIG + "tables foo,bar") } end def test_configure_auth_private_key key = stub! mock(Google::APIClient::KeyUtils).load_from_pkcs12('/path/to/key', 'notasecret') { key } authorization = Object.new stub(Signet::OAuth2::Client).new mock(Signet::OAuth2::Client).new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", scope: API_SCOPE, issuer: 'foo@bar.example', signing_key: key) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_compute_engine authorization = Object.new mock(Google::Auth::GCECredentials).new { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method compute_engine project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_json_key_as_file json_key_path = 'test/plugin/testdata/json_key.json' authorization = Object.new mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: File.open(json_key_path), scope: API_SCOPE) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method json_key json_key #{json_key_path} project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_json_key_as_file_raise_permission_error json_key_path = 'test/plugin/testdata/json_key.json' json_key_path_dir = File.dirname(json_key_path) begin File.chmod(0000, json_key_path_dir) driver = create_driver(%[ table foo auth_method json_key json_key #{json_key_path} project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) assert_raises(Errno::EACCES) do driver.instance.writer.client end ensure File.chmod(0755, json_key_path_dir) end end def test_configure_auth_json_key_as_string json_key = '{"private_key": "X", "client_email": "' + 'x' * 255 + '@developer.gserviceaccount.com"}' json_key_io = StringIO.new(json_key) authorization = Object.new mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: satisfy {|arg| JSON.parse(arg.read) == JSON.parse(json_key_io.read) }, scope: API_SCOPE) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method json_key json_key #{json_key} project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_application_default authorization = Object.new mock(Google::Auth).get_application_default([API_SCOPE]) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method application_default project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_format now = Fluent::EventTime.new(Time.now.to_i) input = { "status" => "1", "bytes" => 3.0, "vhost" => :bar, "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/0.9", "agent" => "libwww", "referer" => "http://referer.example", "requesttime" => (now - 1).to_f.to_s, "bot_access" => true, "loginsession" => false, "something-else" => "would be ignored", "yet-another" => { "foo" => "bar", "baz" => 1, }, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", } } expected = { "time" => now.to_i, "status" => 1, "bytes" => 3, "vhost" => "bar", "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/0.9", "agent" => "libwww", "referer" => "http://referer.example", "requesttime" => (now - 1).to_f.to_s.to_f, "bot_access" => true, "loginsession" => false, "something-else" => "would be ignored", "yet-another" => { "foo" => "bar", "baz" => 1, }, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", } } driver = create_driver(CONFIG) buf = nil driver.run { buf = driver.instance.format("my.tag", now, input) } assert_equal expected, MultiJson.load(buf) end [ # ,