require 'helper' require 'google/apis/bigquery_v2' require 'google/api_client/auth/key_utils' require 'googleauth' require 'active_support/json' require 'active_support/core_ext/hash' require 'active_support/core_ext/object/json' class BigQueryOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "path", "type": "STRING"}, {"name": "method", "type": "STRING"}, {"name": "protocol", "type": "STRING"}, {"name": "agent", "type": "STRING"}, {"name": "referer", "type": "STRING"}, {"name": "remote", "type": "RECORD", "fields": [ {"name": "host", "type": "STRING"}, {"name": "ip", "type": "STRING"}, {"name": "user", "type": "STRING"} ]}, {"name": "requesttime", "type": "FLOAT"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] ] API_SCOPE = "https://www.googleapis.com/auth/bigquery" def create_driver(conf = CONFIG) Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::BigQueryOutput).configure(conf, true) end def stub_writer(driver) writer = driver.instance.writer stub(writer).get_auth { nil } writer end # ref. https://github.com/GoogleCloudPlatform/google-cloud-ruby/blob/ea2be47beb32615b2bf69f8a846a684f86c8328c/google-cloud-bigquery/test/google/cloud/bigquery/table_insert_test.rb#L141 def failure_insert_errors(reason, error_count, insert_error_count) error = Google::Apis::BigqueryV2::ErrorProto.new( reason: reason ) insert_error = Google::Apis::BigqueryV2::InsertAllTableDataResponse::InsertError.new( errors: [].fill(error, 0, error_count) ) res = Google::Apis::BigqueryV2::InsertAllTableDataResponse.new( insert_errors: [].fill(insert_error, 0, insert_error_count) ) return res end def test_configure_table driver = create_driver assert_equal driver.instance.table, 'foo' assert_nil driver.instance.tables driver = create_driver(CONFIG.sub(/\btable\s+.*$/, 'tables foo,bar')) assert_nil driver.instance.table assert_equal driver.instance.tables, 'foo,bar' assert_raise(Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid") { create_driver(CONFIG + "tables foo,bar") } end def test_configure_auth_private_key key = stub! mock(Google::APIClient::KeyUtils).load_from_pkcs12('/path/to/key', 'notasecret') { key } authorization = Object.new stub(Signet::OAuth2::Client).new mock(Signet::OAuth2::Client).new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", scope: API_SCOPE, issuer: 'foo@bar.example', signing_key: key) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_compute_engine authorization = Object.new mock(Google::Auth::GCECredentials).new { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method compute_engine project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_json_key_as_file json_key_path = 'test/plugin/testdata/json_key.json' authorization = Object.new mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: File.open(json_key_path), scope: API_SCOPE) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method json_key json_key #{json_key_path} project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_json_key_as_file_raise_permission_error json_key_path = 'test/plugin/testdata/json_key.json' json_key_path_dir = File.dirname(json_key_path) begin File.chmod(0000, json_key_path_dir) driver = create_driver(%[ table foo auth_method json_key json_key #{json_key_path} project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) assert_raises(Errno::EACCES) do driver.instance.writer.client end ensure File.chmod(0755, json_key_path_dir) end end def test_configure_auth_json_key_as_string json_key = '{"private_key": "X", "client_email": "' + 'x' * 255 + '@developer.gserviceaccount.com"}' json_key_io = StringIO.new(json_key) authorization = Object.new mock(Google::Auth::ServiceAccountCredentials).make_creds(json_key_io: satisfy {|arg| JSON.parse(arg.read) == JSON.parse(json_key_io.read) }, scope: API_SCOPE) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method json_key json_key #{json_key} project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_configure_auth_application_default authorization = Object.new mock(Google::Auth).get_application_default([API_SCOPE]) { authorization } mock.proxy(Google::Apis::BigqueryV2::BigqueryService).new.with_any_args do |cl| mock(cl).__send__(:authorization=, authorization) {} cl end driver = create_driver(%[ table foo auth_method application_default project yourproject_id dataset yourdataset_id schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"} ] ]) mock.proxy(Fluent::BigQuery::Writer).new(duck_type(:info, :error, :warn), driver.instance.auth_method, is_a(Hash)) driver.instance.writer assert driver.instance.writer.client.is_a?(Google::Apis::BigqueryV2::BigqueryService) end def test_format_nested_time now = Time.now input = [ now, { "metadata" => { "node" => "mynode.example", }, "log" => "something", } ] expected = { "json" => { "metadata" => { "time" => now.strftime("%s").to_i, "node" => "mynode.example", }, "log" => "something", } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field metadata.time schema [ {"name": "metadata", "type": "RECORD", "fields": [ {"name": "time", "type": "INTEGER"}, {"name": "node", "type": "STRING"} ]}, {"name": "log", "type": "STRING"} ] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_format_with_schema now = Time.now input = [ now, { "request" => { "vhost" => :bar, "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/0.9", "agent" => "libwww", "referer" => "http://referer.example", "time" => (now - 1).to_f, "bot_access" => true, "loginsession" => false, }, "response" => { "status" => "1", "bytes" => 3.0, }, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", }, "something-else" => "would be ignored", "yet-another" => { "foo" => "bar", "baz" => 1, }, } ] expected = { "json" => { "time" => now.to_i, "request" => { "vhost" => "bar", "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/0.9", "agent" => "libwww", "referer" => "http://referer.example", "time" => (now - 1).to_f, "bot_access" => true, "loginsession" => false, }, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", }, "response" => { "status" => 1, "bytes" => 3, }, "something-else" => "would be ignored", "yet-another" => { "foo" => "bar", "baz" => 1, }, } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema_path #{File.join(File.dirname(__FILE__), "testdata", "apache.schema")} schema [{"name": "time", "type": "INTEGER"}] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_format_repeated_field_with_schema now = Time.now input = [ now, { "tty" => nil, "pwd" => "/home/yugui", "user" => "fluentd", "argv" => %w[ tail -f /var/log/fluentd/fluentd.log ] } ] expected = { "json" => { "time" => now.to_i, "pwd" => "/home/yugui", "user" => "fluentd", "argv" => %w[ tail -f /var/log/fluentd/fluentd.log ] } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema_path #{File.join(File.dirname(__FILE__), "testdata", "sudo.schema")} schema [{"name": "time", "type": "INTEGER"}] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_format_fetch_from_bigquery_api now = Time.now input = [ now, { "tty" => nil, "pwd" => "/home/yugui", "user" => "fluentd", "argv" => %w[ tail -f /var/log/fluentd/fluentd.log ] } ] expected = { "json" => { "time" => now.to_i, "pwd" => "/home/yugui", "user" => "fluentd", "argv" => %w[ tail -f /var/log/fluentd/fluentd.log ] } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time fetch_schema true schema [{"name": "time", "type": "INTEGER"}] CONFIG writer = stub_writer(driver) mock(writer).fetch_schema('yourproject_id', 'yourdataset_id', 'foo') do sudo_schema_response.deep_stringify_keys["schema"]["fields"] end driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) fields = driver.instance.instance_eval{ @fields } assert fields["time"] assert_equal :integer, fields["time"].type # DO NOT OVERWRITE assert_equal :nullable, fields["time"].mode # DO NOT OVERWRITE assert fields["tty"] assert_equal :string, fields["tty"].type assert_equal :nullable, fields["tty"].mode assert fields["pwd"] assert_equal :string, fields["pwd"].type assert_equal :required, fields["pwd"].mode assert fields["user"] assert_equal :string, fields["user"].type assert_equal :required, fields["user"].mode assert fields["argv"] assert_equal :string, fields["argv"].type assert_equal :repeated, fields["argv"].mode end def test_format_fetch_from_bigquery_api_with_generated_table_id now = Time.now input = [ now, { "tty" => nil, "pwd" => "/home/yugui", "user" => "fluentd", "argv" => %w[ tail -f /var/log/fluentd/fluentd.log ] } ] expected = { "json" => { "time" => now.to_i, "pwd" => "/home/yugui", "user" => "fluentd", "argv" => %w[ tail -f /var/log/fluentd/fluentd.log ] } } driver = create_driver(<<-CONFIG) table foo_%Y_%m_%d email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time fetch_schema true schema [{"name": "time", "type": "INTEGER"}] CONFIG writer = stub_writer(driver) mock(writer).fetch_schema('yourproject_id', 'yourdataset_id', now.strftime('foo_%Y_%m_%d')) do sudo_schema_response.deep_stringify_keys["schema"]["fields"] end driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) fields = driver.instance.instance_eval{ @fields } assert fields["time"] assert_equal :integer, fields["time"].type # DO NOT OVERWRITE assert_equal :nullable, fields["time"].mode # DO NOT OVERWRITE assert fields["tty"] assert_equal :string, fields["tty"].type assert_equal :nullable, fields["tty"].mode assert fields["pwd"] assert_equal :string, fields["pwd"].type assert_equal :required, fields["pwd"].mode assert fields["user"] assert_equal :string, fields["user"].type assert_equal :required, fields["user"].mode assert fields["argv"] assert_equal :string, fields["argv"].type assert_equal :repeated, fields["argv"].mode end def test_format_with_insert_id now = Time.now input = [ now, { "uuid" => "9ABFF756-0267-4247-847F-0895B65F0938", } ] expected = { "insert_id" => "9ABFF756-0267-4247-847F-0895B65F0938", "json" => { "uuid" => "9ABFF756-0267-4247-847F-0895B65F0938", } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id insert_id_field uuid schema [{"name": "uuid", "type": "STRING"}] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_format_with_nested_insert_id now = Time.now input = [ now, { "data" => { "uuid" => "809F6BA7-1C16-44CD-9816-4B20E2C7AA2A", }, } ] expected = { "insert_id" => "809F6BA7-1C16-44CD-9816-4B20E2C7AA2A", "json" => { "data" => { "uuid" => "809F6BA7-1C16-44CD-9816-4B20E2C7AA2A", } } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id insert_id_field data.uuid schema [{"name": "data", "type": "RECORD", "fields": [ {"name": "uuid", "type": "STRING"} ]}] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_format_for_load now = Time.now input = [ now, { "uuid" => "9ABFF756-0267-4247-847F-0895B65F0938", } ] expected = MultiJson.dump({ "uuid" => "9ABFF756-0267-4247-847F-0895B65F0938", }) + "\n" driver = create_driver(<<-CONFIG) method load table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id schema [{"name": "uuid", "type": "STRING"}] buffer_type memory CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, buf end def test_replace_record_key now = Time.now input = [ now, { "vhost" => :bar, "@referer" => "http://referer.example", "bot_access" => true, "login-session" => false } ] expected = { "json" => { "time" => now.to_i, "vhost" => "bar", "referer" => "http://referer.example", "bot_access" => true, "login_session" => false } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id replace_record_key true replace_record_key_regexp1 - _ time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "refere", "type": "STRING"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "login_session", "type": "BOOLEAN"} ] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_convert_hash_to_json now = Time.now input = [ now, { "vhost" => :bar, "referer" => "http://referer.example", "bot_access" => true, "loginsession" => false, "remote" => { "host" => "remote.example", "ip" => "192.0.2.1", "port" => 12345, "user" => "tagomoris", } } ] expected = { "json" => { "time" => now.to_i, "vhost" => "bar", "referer" => "http://referer.example", "bot_access" => true, "loginsession" => false, "remote" => "{\"host\":\"remote.example\",\"ip\":\"192.0.2.1\",\"port\":12345,\"user\":\"tagomoris\"}" } } driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id convert_hash_to_json true time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "refere", "type": "STRING"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] CONFIG driver.instance.start buf = driver.instance.format_stream("my.tag", [input]) driver.instance.shutdown assert_equal expected, MessagePack.unpack(buf) end def test_write entry = {json: {a: "b"}}, {json: {b: "c"}} driver = create_driver writer = stub_writer(driver) mock.proxy(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', entry, template_suffix: nil) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', { rows: entry, skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = stub! s.insert_errors { nil } s end chunk = Fluent::MemoryBufferChunk.new("my.tag") entry.each do |e| chunk << e.to_msgpack end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end def test_write_with_retryable_error entry = {json: {a: "b"}}, {json: {b: "c"}} data_input = [ { "status_code" => 500 }, { "status_code" => 502 }, { "status_code" => 503 }, { "status_code" => 504 }, ] data_input.each do |d| driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "path", "type": "STRING"}, {"name": "method", "type": "STRING"}, {"name": "protocol", "type": "STRING"}, {"name": "agent", "type": "STRING"}, {"name": "referer", "type": "STRING"}, {"name": "remote", "type": "RECORD", "fields": [ {"name": "host", "type": "STRING"}, {"name": "ip", "type": "STRING"}, {"name": "user", "type": "STRING"} ]}, {"name": "requesttime", "type": "FLOAT"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] type file path error utc CONFIG writer = stub_writer(driver) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', { rows: entry, skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) do ex = Google::Apis::ServerError.new("error", status_code: d["status_code"]) raise ex end chunk = Fluent::MemoryBufferChunk.new("my.tag") entry.each do |e| chunk << e.to_msgpack end driver.instance.start assert_raise Fluent::BigQuery::RetryableError do driver.instance.write(chunk) end driver.instance.shutdown end end def test_write_with_not_retryable_error entry = {json: {a: "b"}}, {json: {b: "c"}} driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "path", "type": "STRING"}, {"name": "method", "type": "STRING"}, {"name": "protocol", "type": "STRING"}, {"name": "agent", "type": "STRING"}, {"name": "referer", "type": "STRING"}, {"name": "remote", "type": "RECORD", "fields": [ {"name": "host", "type": "STRING"}, {"name": "ip", "type": "STRING"}, {"name": "user", "type": "STRING"} ]}, {"name": "requesttime", "type": "FLOAT"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] type file path error utc CONFIG writer = stub_writer(driver) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', { rows: entry, skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) do ex = Google::Apis::ServerError.new("error", status_code: 501) def ex.reason "invalid" end raise ex end mock(driver.instance).flush_secondary(is_a(Fluent::Output)) chunk = Fluent::MemoryBufferChunk.new("my.tag") entry.each do |e| chunk << e.to_msgpack end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end def test_write_with_retryable_insert_errors data_input = [ { "error_count" => 1, "insert_error_count" => 1 }, { "error_count" => 10, "insert_error_count" => 1 }, { "error_count" => 10, "insert_error_count" => 10 }, ] data_input.each do |d| entry = {json: {a: "b"}}, {json: {b: "c"}} allow_retry_insert_errors = true driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id allow_retry_insert_errors #{allow_retry_insert_errors} time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "path", "type": "STRING"}, {"name": "method", "type": "STRING"}, {"name": "protocol", "type": "STRING"}, {"name": "agent", "type": "STRING"}, {"name": "referer", "type": "STRING"}, {"name": "remote", "type": "RECORD", "fields": [ {"name": "host", "type": "STRING"}, {"name": "ip", "type": "STRING"}, {"name": "user", "type": "STRING"} ]}, {"name": "requesttime", "type": "FLOAT"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] type file path error utc CONFIG writer = stub_writer(driver) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', { rows: entry, skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = failure_insert_errors("timeout", d["error_count"], d["insert_error_count"]) s end chunk = Fluent::MemoryBufferChunk.new("my.tag") entry.each do |e| chunk << e.to_msgpack end driver.instance.start assert_raise Fluent::BigQuery::RetryableError do driver.instance.write(chunk) end driver.instance.shutdown end end def test_write_with_not_retryable_insert_errors data_input = [ { "allow_retry_insert_errors" => false, "reason" => "timeout" }, { "allow_retry_insert_errors" => true, "reason" => "stopped" }, ] data_input.each do |d| entry = {json: {a: "b"}}, {json: {b: "c"}} driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id allow_retry_insert_errors #{d["allow_retry_insert_errors"]} time_format %s time_field time schema [ {"name": "time", "type": "INTEGER"}, {"name": "status", "type": "INTEGER"}, {"name": "bytes", "type": "INTEGER"}, {"name": "vhost", "type": "STRING"}, {"name": "path", "type": "STRING"}, {"name": "method", "type": "STRING"}, {"name": "protocol", "type": "STRING"}, {"name": "agent", "type": "STRING"}, {"name": "referer", "type": "STRING"}, {"name": "remote", "type": "RECORD", "fields": [ {"name": "host", "type": "STRING"}, {"name": "ip", "type": "STRING"}, {"name": "user", "type": "STRING"} ]}, {"name": "requesttime", "type": "FLOAT"}, {"name": "bot_access", "type": "BOOLEAN"}, {"name": "loginsession", "type": "BOOLEAN"} ] type file path error utc CONFIG writer = stub_writer(driver) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', { rows: entry, skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = failure_insert_errors(d["reason"], 1, 1) s end chunk = Fluent::MemoryBufferChunk.new("my.tag") entry.each do |e| chunk << e.to_msgpack end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end end def test_write_for_load schema_path = File.join(File.dirname(__FILE__), "testdata", "sudo.schema") entry = {a: "b"}, {b: "c"} driver = create_driver(<<-CONFIG) method load table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema_path #{schema_path} buffer_type memory CONFIG schema_fields = MultiJson.load(File.read(schema_path)).map(&:deep_symbolize_keys) writer = stub_writer(driver) chunk = Fluent::MemoryBufferChunk.new("my.tag") io = StringIO.new("hello") mock(driver.instance).create_upload_source(chunk).yields(io) mock(writer).wait_load_job(is_a(String), "yourproject_id", "yourdataset_id", "dummy_job_id", "foo") { nil } mock(writer.client).insert_job('yourproject_id', { configuration: { load: { destination_table: { project_id: 'yourproject_id', dataset_id: 'yourdataset_id', table_id: 'foo', }, schema: { fields: schema_fields, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON", ignore_unknown_values: false, max_bad_records: 0, } } }, {upload_source: io, content_type: "application/octet-stream", options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = stub! job_reference_stub = stub! s.job_reference { job_reference_stub } job_reference_stub.job_id { "dummy_job_id" } s end entry.each do |e| chunk << MultiJson.dump(e) + "\n" end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end def test_write_for_load_with_prevent_duplicate_load schema_path = File.join(File.dirname(__FILE__), "testdata", "sudo.schema") entry = {a: "b"}, {b: "c"} driver = create_driver(<<-CONFIG) method load table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema_path #{schema_path} prevent_duplicate_load true buffer_type memory CONFIG schema_fields = MultiJson.load(File.read(schema_path)).map(&:deep_symbolize_keys) chunk = Fluent::MemoryBufferChunk.new("my.tag") io = StringIO.new("hello") mock(driver.instance).create_upload_source(chunk).yields(io) writer = stub_writer(driver) mock(writer).wait_load_job(is_a(String), "yourproject_id", "yourdataset_id", "dummy_job_id", "foo") { nil } mock(writer.client).insert_job('yourproject_id', { configuration: { load: { destination_table: { project_id: 'yourproject_id', dataset_id: 'yourdataset_id', table_id: 'foo', }, schema: { fields: schema_fields, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON", ignore_unknown_values: false, max_bad_records: 0, }, }, job_reference: {project_id: 'yourproject_id', job_id: satisfy { |x| x =~ /fluentd_job_.*/}} , }, {upload_source: io, content_type: "application/octet-stream", options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = stub! job_reference_stub = stub! s.job_reference { job_reference_stub } job_reference_stub.job_id { "dummy_job_id" } s end entry.each do |e| chunk << MultiJson.dump(e) + "\n" end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end def test_write_for_load_with_retryable_error schema_path = File.join(File.dirname(__FILE__), "testdata", "sudo.schema") entry = {a: "b"}, {b: "c"} driver = create_driver(<<-CONFIG) method load table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema_path #{schema_path} buffer_type memory CONFIG schema_fields = MultiJson.load(File.read(schema_path)).map(&:deep_symbolize_keys) chunk = Fluent::MemoryBufferChunk.new("my.tag") io = StringIO.new("hello") mock(driver.instance).create_upload_source(chunk).yields(io) writer = stub_writer(driver) mock(writer.client).insert_job('yourproject_id', { configuration: { load: { destination_table: { project_id: 'yourproject_id', dataset_id: 'yourdataset_id', table_id: 'foo', }, schema: { fields: schema_fields, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON", ignore_unknown_values: false, max_bad_records: 0, } } }, {upload_source: io, content_type: "application/octet-stream", options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = stub! job_reference_stub = stub! s.job_reference { job_reference_stub } job_reference_stub.job_id { "dummy_job_id" } s end mock(writer.client).get_job('yourproject_id', 'dummy_job_id') do s = stub! status_stub = stub! error_result = stub! s.status { status_stub } status_stub.state { "DONE" } status_stub.error_result { error_result } status_stub.errors { nil } error_result.message { "error" } error_result.reason { "backendError" } s end entry.each do |e| chunk << MultiJson.dump(e) + "\n" end driver.instance.start assert_raise Fluent::BigQuery::RetryableError do driver.instance.write(chunk) end driver.instance.shutdown end def test_write_for_load_with_not_retryable_error schema_path = File.join(File.dirname(__FILE__), "testdata", "sudo.schema") entry = {a: "b"}, {b: "c"} driver = create_driver(<<-CONFIG) method load table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time schema_path #{schema_path} buffer_type memory type file path error utc CONFIG schema_fields = MultiJson.load(File.read(schema_path)).map(&:deep_symbolize_keys) chunk = Fluent::MemoryBufferChunk.new("my.tag") io = StringIO.new("hello") mock(driver.instance).create_upload_source(chunk).yields(io) writer = stub_writer(driver) mock(writer.client).insert_job('yourproject_id', { configuration: { load: { destination_table: { project_id: 'yourproject_id', dataset_id: 'yourdataset_id', table_id: 'foo', }, schema: { fields: schema_fields, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON", ignore_unknown_values: false, max_bad_records: 0, } } }, {upload_source: io, content_type: "application/octet-stream", options: {timeout_sec: nil, open_timeout_sec: 60}}) do s = stub! job_reference_stub = stub! s.job_reference { job_reference_stub } job_reference_stub.job_id { "dummy_job_id" } s end mock(writer.client).get_job('yourproject_id', 'dummy_job_id') do s = stub! status_stub = stub! error_result = stub! s.status { status_stub } status_stub.state { "DONE" } status_stub.error_result { error_result } status_stub.errors { nil } error_result.message { "error" } error_result.reason { "invalid" } s end mock(driver.instance).flush_secondary(is_a(Fluent::Output)) entry.each do |e| chunk << MultiJson.dump(e) + "\n" end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end def test_write_with_row_based_table_id_formatting entry = [ {json: {a: "b", created_at: Time.local(2014,8,20,9,0,0).to_i}}, {json: {b: "c", created_at: Time.local(2014,8,21,9,0,0).to_i}} ] driver = create_driver(<<-CONFIG) table foo_%Y_%m_%d@created_at email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time field_integer time,status,bytes field_string vhost,path,method,protocol,agent,referer,remote.host,remote.ip,remote.user field_float requesttime field_boolean bot_access,loginsession CONFIG writer = stub_writer(driver) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo_2014_08_20', { rows: [entry[0]], skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) { stub!.insert_errors { nil } } mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo_2014_08_21', { rows: [entry[1]], skip_invalid_rows: false, ignore_unknown_values: false }, {options: {timeout_sec: nil, open_timeout_sec: 60}}) { stub!.insert_errors { nil } } chunk = Fluent::MemoryBufferChunk.new("my.tag") entry.each do |object| chunk << object.to_msgpack end driver.instance.start driver.instance.write(chunk) driver.instance.shutdown end def test_generate_table_id_without_row driver = create_driver table_id_format = 'foo_%Y_%m_%d' time = Time.local(2014, 8, 11, 21, 20, 56) table_id = driver.instance.generate_table_id(table_id_format, time, nil) assert_equal 'foo_2014_08_11', table_id end def test_generate_table_id_with_row driver = create_driver table_id_format = 'foo_%Y_%m_%d@created_at' time = Time.local(2014, 8, 11, 21, 20, 56) row = { json: { created_at: Time.local(2014,8,10,21,20,57).to_i } } table_id = driver.instance.generate_table_id(table_id_format, time, row) assert_equal 'foo_2014_08_10', table_id end def test_generate_table_id_with_row_nested_attribute driver = create_driver table_id_format = 'foo_%Y_%m_%d@foo.bar.created_at' time = Time.local(2014, 8, 11, 21, 20, 56) row = { json: { foo: { bar: { created_at: Time.local(2014,8,10,21,20,57).to_i } } } } table_id = driver.instance.generate_table_id(table_id_format, time, row) assert_equal 'foo_2014_08_10', table_id end def test_generate_table_id_with_time_sliced_format driver = create_driver table_id_format = 'foo_%{time_slice}' current_time = Time.now time = Time.local(2014, 8, 11, 21, 20, 56) row = { "json" => { "foo" => "bar", "time" => time.to_i } } chunk = Object.new mock(chunk).key { time.strftime("%Y%m%d") } table_id = driver.instance.generate_table_id(table_id_format, current_time, row, chunk) assert_equal 'foo_20140811', table_id end def test_generate_table_id_with_attribute_replacement driver = create_driver table_id_format = 'foo_%Y_%m_%d_${baz}' current_time = Time.now time = Time.local(2014, 8, 11, 21, 20, 56) [ [ { baz: 1234 }, 'foo_2014_08_11_1234' ], [ { baz: 'piyo' }, 'foo_2014_08_11_piyo' ], [ { baz: true }, 'foo_2014_08_11_true' ], [ { baz: nil }, 'foo_2014_08_11_' ], [ { baz: '' }, 'foo_2014_08_11_' ], [ { baz: "_X-Y.Z !\n" }, 'foo_2014_08_11__XYZ' ], [ { baz: { xyz: 1 } }, 'foo_2014_08_11_xyz1' ], ].each do |attrs, expected| row = { json: { created_at: Time.local(2014,8,10,21,20,57).to_i }.merge(attrs) } table_id = driver.instance.generate_table_id(table_id_format, time, row) assert_equal expected, table_id end end def test_auto_create_table_by_bigquery_api now = Time.now message = { "json" => { "time" => now.to_i, "request" => { "vhost" => "bar", "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/1.0", "agent" => "libwww", "referer" => "http://referer.example", "time" => (now - 1).to_f, "bot_access" => true, "loginsession" => false, }, "remote" => { "host" => "remote.example", "ip" => "192.168.1.1", "user" => "nagachika", }, "response" => { "status" => 200, "bytes" => 72, }, } }.deep_symbolize_keys driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time auto_create_table true schema_path #{File.join(File.dirname(__FILE__), "testdata", "apache.schema")} CONFIG writer = stub_writer(driver) mock(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [message], template_suffix: nil) { raise Fluent::BigQuery::RetryableError.new(nil, Google::Apis::ServerError.new("Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404, body: "Not found: Table yourproject_id:yourdataset_id.foo")) } mock(writer).create_table('yourproject_id', 'yourdataset_id', 'foo', driver.instance.instance_variable_get(:@fields)) chunk = Fluent::MemoryBufferChunk.new("my.tag") chunk << message.to_msgpack driver.instance.start assert_raise(RuntimeError) { driver.instance.write(chunk) } driver.instance.shutdown end def test_auto_create_partitioned_table_by_bigquery_api now = Time.now message = { "json" => { "time" => now.to_i, "request" => { "vhost" => "bar", "path" => "/path/to/baz", "method" => "GET", "protocol" => "HTTP/1.0", "agent" => "libwww", "referer" => "http://referer.example", "time" => (now - 1).to_f, "bot_access" => true, "loginsession" => false, }, "remote" => { "host" => "remote.example", "ip" => "192.168.1.1", "user" => "nagachika", }, "response" => { "status" => 200, "bytes" => 72, }, } }.deep_symbolize_keys driver = create_driver(<<-CONFIG) table foo email foo@bar.example private_key_path /path/to/key project yourproject_id dataset yourdataset_id time_format %s time_field time auto_create_table true schema_path #{File.join(File.dirname(__FILE__), "testdata", "apache.schema")} time_partitioning_type day time_partitioning_expiration 1h CONFIG writer = stub_writer(driver) mock(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [message], template_suffix: nil) { raise Fluent::BigQuery::RetryableError.new(nil, Google::Apis::ServerError.new("Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404, body: "Not found: Table yourproject_id:yourdataset_id.foo")) } mock(writer).create_table('yourproject_id', 'yourdataset_id', 'foo', driver.instance.instance_variable_get(:@fields)) chunk = Fluent::MemoryBufferChunk.new("my.tag") chunk << message.to_msgpack driver.instance.start assert_raise(RuntimeError) { driver.instance.write(chunk) } driver.instance.shutdown end private def sudo_schema_response { schema: { fields: [ { name: "time", type: "TIMESTAMP", mode: "REQUIRED" }, { name: "tty", type: "STRING", mode: "NULLABLE" }, { name: "pwd", type: "STRING", mode: "REQUIRED" }, { name: "user", type: "STRING", mode: "REQUIRED" }, { name: "argv", type: "STRING", mode: "REPEATED" } ] } } end end