require 'helper' require 'google/apis/bigquery_v2' require 'google/api_client/auth/key_utils' require 'googleauth' require 'active_support/json' require 'active_support/core_ext/hash' require 'active_support/core_ext/object/json' class BigQueryOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time field_integer time,status,bytes field_string vhost,path,method,protocol,agent,referer,remote.host,remote.ip,remote.user field_float requesttime field_boolean bot_access,loginsession ] API_SCOPE = "https://www.googleapis.com/auth/bigquery" def create_driver(conf = CONFIG) Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::BigQueryOutput).configure(conf) end def stub_writer(driver) writer = driver.instance.writer stub(writer).get_auth { nil } writer end def test_configure_table driver = create_driver assert_equal driver.instance.table, 'foo' assert_nil driver.instance.tables driver = create_driver(CONFIG.sub(/\btable\s+.*$/, 'tables foo,bar')) assert_nil driver.instance.table assert_equal driver.instance.tables, 'foo,bar' assert_raise(Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid") { create_driver(CONFIG + "tables foo,bar") } end def test_configure_auth_private_key key = stub! mock(Google::APIClient::KeyUtils).load_from_pkcs12('/path/to/key', 'notasecret') { key } authorization = Object.new stub(Signet::OAuth2::Client).new mock(Signet::OAuth2::Client).new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", scope: API_SCOPE, issuer: 'foo@bar.example', signing_key: key) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_compute_engine authorization = Object.new mock(Google::Auth::GCECredentials).new { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method compute_engine project yourproject_id dataset yourdataset_id field_integer time,status,bytes ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_json_key_as_file json_key_path = 'test/plugin/testdata/json_key.json' authorization = Object.new mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: File.open(json_key_path), scope: API_SCOPE) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method json_key json_key #{json_key_path} project yourproject_id dataset yourdataset_id field_integer time,status,bytes ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_json_key_as_file_raise_permission_error json_key_path = 'test/plugin/testdata/json_key.json' json_key_path_dir = File.dirname(json_key_path) begin File.chmod(0000, json_key_path_dir) driver = create_driver(%[ table foo auth_method json_key json_key #{json_key_path} project yourproject_id dataset yourdataset_id field_integer time,status,bytes ]) assert_raises(Errno::EACCES) do driver.instance.writer.client end ensure File.chmod(0755, json_key_path_dir) end end def test_configure_auth_json_key_as_string json_key = '{"private_key": "X", "client_email": "xxx@developer.gserviceaccount.com"}' json_key_io = StringIO.new(json_key) mock(StringIO).new(json_key) { json_key_io } authorization = Object.new mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: json_key_io, scope: API_SCOPE) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method json_key json_key #{json_key} project yourproject_id dataset yourdataset_id field_integer time,status,bytes ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_application_default authorization = Object.new mock(Google::Auth).get_application_default([API_SCOPE]) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method application_default project yourproject_id dataset yourdataset_id field_integer time,status,bytes ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_fieldname_stripped driver = create_driver(%[ table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time field_integer time , status , bytes field_string _log_name, vhost, path, method, protocol, agent, referer, remote.host, remote.ip, remote.user field_float requesttime field_boolean bot_access , loginsession ]) fields = driver.instance.instance_eval{ @fields } assert (not fields['time ']), "tailing spaces must be stripped" assert fields['time'] assert fields['status'] assert fields['bytes'] assert fields['_log_name'] assert fields['vhost'] assert fields['protocol'] assert fields['agent'] assert fields['referer'] assert fields['remote']['host'] assert fields['remote']['ip'] assert fields['remote']['user'] assert fields['requesttime'] assert fields['bot_access'] assert fields['loginsession'] end def test_configure_invalid_fieldname base = %[ table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time ] assert_raises(Fluent::ConfigError) do create_driver(base + "field_integer time field\n") end assert_raises(Fluent::ConfigError) do create_driver(base + "field_string my name\n") end assert_raises(Fluent::ConfigError) do create_driver(base + "field_string remote.host name\n") end assert_raises(Fluent::ConfigError) do create_driver(base + "field_string 1column\n") end assert_raises(Fluent::ConfigError) do create_driver(base + "field_string #{'tenstrings' * 12 + '123456789'}\n") end assert_raises(Fluent::ConfigError) do create_driver(base + "field_float request time\n") end assert_raises(Fluent::ConfigError) do create_driver(base + "field_boolean login session\n") end end def test_format_stream now = Time.now input = [ now, { "status" => "1", "bytes" => 3.0, "vhost" => :bar, "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/0.9", "agent" => "libwww", "referer" => "http://referer.example", "requesttime" => (now - 1).to_f.to_s, "bot_access" => true, "loginsession" => false, "something-else" => "would be ignored", "yet-another" => { "foo" => "bar", "baz" => 1, }, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", } } ] expected = { "json" => { "time" => now.to_i, "status" => 1, "bytes" => 3, "vhost" => "bar", "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/0.9", "agent" => "libwww", "referer" => "http://referer.example", "requesttime" => (now - 1).to_f.to_s.to_f, "bot_access" => true, "loginsession" => false, "something-else" => "would be ignored", "yet-another" => { "foo" => "bar", "baz" => 1, }, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", } } } driver = create_driver(CONFIG) driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end [ # ,