# Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'helper' require 'json' require 'webmock/test_unit' class GoogleCloudOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup # Create stubs for all the GCE metadata lookups the agent needs to make. stub_metadata_request('project/project-id', PROJECT_ID) stub_metadata_request('instance/zone', FULLY_QUALIFIED_ZONE) stub_metadata_request('instance/id', VM_ID) stub_metadata_request('instance/attributes/', "attribute1\nattribute2\nattribute3") stub_request(:post, 'https://accounts.google.com/o/oauth2/token'). with(:body => hash_including({:grant_type => AUTH_GRANT_TYPE})). to_return(:body => "{\"access_token\": \"#{FAKE_AUTH_TOKEN}\"}", :status => 200, :headers => {'Content-Length' => FAKE_AUTH_TOKEN, 'Content-Type' => 'application/json' }) @logs_sent = [] end def setup_logging_stubs [COMPUTE_PARAMS, VMENGINE_PARAMS].each do |params| stub_request(:post, uri_for_log(params)).to_return do |request| @logs_sent << JSON.parse(request.body) {:body => ''} end end end PROJECT_ID = 'test-project-id' ZONE = 'us-central1-b' FULLY_QUALIFIED_ZONE = 'projects/' + PROJECT_ID + '/zones/' + ZONE VM_ID = '9876543210' MANAGED_VM_BACKEND_NAME = 'default' MANAGED_VM_BACKEND_VERSION = 'guestbook2.0' AUTH_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer' FAKE_AUTH_TOKEN = 'abc123' COMPUTE_ENGINE_SERVICE_ACCOUNT_CONFIG = %[ ] PRIVATE_KEY_CONFIG = %[ auth_method private_key private_key_email 271661262351-ft99kc9kjro9rrihq3k2n3s2inbplu0q@developer.gserviceaccount.com private_key_path test/plugin/data/c31e573fd7f62ed495c9ca3821a5a85cb036dee1-privatekey.p12 ] INVALID_CONFIG1 = %[ auth_method private_key private_key_email nobody@example.com ] INVALID_CONFIG2 = %[ auth_method private_key private_key_path /fake/path/to/key ] INVALID_CONFIG3 = %[ auth_method service_account ] COMPUTE_SERVICE_NAME = 'compute.googleapis.com' APPENGINE_SERVICE_NAME = 'appengine.googleapis.com' COMPUTE_PARAMS = { 'service_name' => COMPUTE_SERVICE_NAME, 'log_name' => 'test', 'labels' => { "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID } } VMENGINE_PARAMS = { 'service_name' => APPENGINE_SERVICE_NAME, 'log_name' => "#{APPENGINE_SERVICE_NAME}%2Ftest", 'labels' => { "#{APPENGINE_SERVICE_NAME}/module_id" => MANAGED_VM_BACKEND_NAME, "#{APPENGINE_SERVICE_NAME}/version_id" => MANAGED_VM_BACKEND_VERSION, "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID } } def create_driver(conf=PRIVATE_KEY_CONFIG) Fluent::Test::BufferedOutputTestDriver.new( Fluent::GoogleCloudOutput).configure(conf) end def test_configure_service_account d = create_driver(COMPUTE_ENGINE_SERVICE_ACCOUNT_CONFIG) assert_equal 'compute_engine_service_account', d.instance.auth_method end def test_configure_service_account d = create_driver(PRIVATE_KEY_CONFIG) assert_equal 'private_key', d.instance.auth_method end def test_configure_invalid_configs begin d = create_driver(INVALID_CONFIG1) assert_false rescue Fluent::ConfigError => error assert error.message.include? 'private_key_path' end begin d = create_driver(INVALID_CONFIG2) assert_false rescue Fluent::ConfigError => error assert error.message.include? 'private_key_email' end begin d = create_driver(INVALID_CONFIG3) assert_false rescue Fluent::ConfigError => error assert error.message.include? 'auth_method' end end def test_metadata_loading d = create_driver(PRIVATE_KEY_CONFIG) d.run assert_equal PROJECT_ID, d.instance.project_id assert_equal ZONE, d.instance.zone assert_equal VM_ID, d.instance.vm_id assert_equal false, d.instance.running_on_managed_vm end def test_managed_vm_metadata_loading setup_managed_vm_metadata_stubs d = create_driver(PRIVATE_KEY_CONFIG) d.run assert_equal PROJECT_ID, d.instance.project_id assert_equal ZONE, d.instance.zone assert_equal VM_ID, d.instance.vm_id assert_equal true, d.instance.running_on_managed_vm assert_equal MANAGED_VM_BACKEND_NAME, d.instance.gae_backend_name assert_equal MANAGED_VM_BACKEND_VERSION, d.instance.gae_backend_version end def test_one_log setup_logging_stubs d = create_driver(PRIVATE_KEY_CONFIG) d.emit({'message' => log_entry(0)}) d.run verify_log_entries(1, COMPUTE_PARAMS) end def test_timestamps setup_logging_stubs d = create_driver(PRIVATE_KEY_CONFIG) expected_ts = [] emit_index = 0 [Time.at(123456.789), Time.at(0), Time.now].each do |ts| # Test both the "native" fluentd timestamp and timeNanos. d.emit({'message' => log_entry(emit_index)}, ts.to_f) # The native timestamp currently only supports second granularity # (fluentd issue #461), so strip nanoseconds from the expected value. expected_ts.push(Time.at(ts.tv_sec)) emit_index += 1 d.emit({'message' => log_entry(emit_index), 'timeNanos' => ts.tv_sec * 1000000000 + ts.tv_nsec}) expected_ts.push(ts) emit_index += 1 end d.run verify_index = 0 verify_log_entries(emit_index, COMPUTE_PARAMS) do |entry| assert_equal expected_ts[verify_index].tv_sec, entry['metadata']['timestamp']['seconds'], entry assert_equal expected_ts[verify_index].tv_nsec, entry['metadata']['timestamp']['nanos'], entry verify_index += 1 end end def test_multiple_logs setup_logging_stubs d = create_driver(PRIVATE_KEY_CONFIG) # Only test a few values because otherwise the test can take minutes. [2, 3, 5, 11, 50].each do |n| # The test driver doesn't clear its buffer of entries after running, so # do it manually here. d.instance_variable_get('@entries').clear @logs_sent = [] n.times { |i| d.emit({'message' => log_entry(i)}) } d.run verify_log_entries(n, COMPUTE_PARAMS) end end def test_client_error # The API Client should not retry this and the plugin should consume # the exception. stub_request(:post, uri_for_log(COMPUTE_PARAMS)).to_return( :status => 400, :body => "Bad Request") d = create_driver(PRIVATE_KEY_CONFIG) d.emit({'message' => log_entry(0)}) d.run assert_requested(:post, uri_for_log(COMPUTE_PARAMS), :times => 1) end # helper for the ClientError retriable special cases below. def client_error_helper(message) stub_request(:post, uri_for_log(COMPUTE_PARAMS)).to_return( :status => 401, :body => message) d = create_driver(PRIVATE_KEY_CONFIG) d.emit({'message' => log_entry(0)}) exception_count = 0 begin d.run rescue Google::APIClient::ClientError => error assert_equal message, error.message exception_count += 1 end assert_requested(:post, uri_for_log(COMPUTE_PARAMS), :times => 2) assert_equal 1, exception_count end def test_client_error_invalid_credentials client_error_helper("Invalid Credentials") end def test_client_error_caller_does_not_have_permission client_error_helper("The caller does not have permission") end def test_client_error_request_had_invalid_credentials client_error_helper("Request had invalid credentials.") end def test_client_error_project_has_not_enabled_the_api client_error_helper("Project has not enabled the API. Please use Google Developers Console to activate the API for your project.") end def test_client_error_unable_to_fetch_accesss_token client_error_helper("Unable to fetch access token (no scopes configured?)") end def test_server_error # The API client should retry this once, then throw an exception which # gets propagated through the plugin. stub_request(:post, uri_for_log(COMPUTE_PARAMS)).to_return( :status => 500, :body => "Server Error") d = create_driver(PRIVATE_KEY_CONFIG) d.emit({'message' => log_entry(0)}) exception_count = 0 begin d.run rescue Google::APIClient::ServerError => error assert_equal 'Server Error', error.message exception_count += 1 end assert_requested(:post, uri_for_log(COMPUTE_PARAMS), :times => 2) assert_equal 1, exception_count end def test_one_managed_vm_log setup_managed_vm_metadata_stubs setup_logging_stubs d = create_driver(PRIVATE_KEY_CONFIG) d.emit({'message' => log_entry(0)}) d.run verify_log_entries(1, VMENGINE_PARAMS) end def test_multiple_managed_vm_logs setup_managed_vm_metadata_stubs setup_logging_stubs d = create_driver(PRIVATE_KEY_CONFIG) [2, 3, 5, 11, 50].each do |n| # The test driver doesn't clear its buffer of entries after running, so # do it manually here. d.instance_variable_get('@entries').clear @logs_sent = [] n.times { |i| d.emit({'message' => log_entry(i)}) } d.run verify_log_entries(n, VMENGINE_PARAMS) end end private def uri_for_log(config) 'https://logging.googleapis.com/v1beta3/projects/' + PROJECT_ID + '/logs/' + config['log_name'] + '/entries:write' end def stub_metadata_request(metadata_path, response_body) stub_request(:get, 'http://metadata/computeMetadata/v1/' + metadata_path). to_return(:body => response_body, :status => 200, :headers => {'Content-Length' => response_body.length}) end def setup_managed_vm_metadata_stubs stub_metadata_request( 'instance/attributes/', "attribute1\ngae_backend_name\ngae_backend_version\nlast_attribute") stub_metadata_request('instance/attributes/gae_backend_name', 'default') stub_metadata_request('instance/attributes/gae_backend_version', 'guestbook2.0') end def log_entry(i) 'test log entry ' + i.to_s end def check_labels(entry, common_labels, expected_labels) # TODO(salty) test/handle overlap between common_labels and entry labels all_labels ||= common_labels all_labels.merge!(entry['metadata']['labels'] || {}) all_labels.each do |key, value| assert expected_labels.has_key?(key), "Unexpected label #{key} => #{value}" assert_equal value, expected_labels[key], "Value mismatch - expected #{expected_labels[key]} in #{key} => #{value}" end assert_equal expected_labels.length, all_labels.length, ("Expected #{expected_labels.length} labels, got " + "#{all_labels.length}") end # The caller can optionally provide a block which is called for each entry. def verify_log_entries(n, params) i = 0 @logs_sent.each do |batch| batch['entries'].each do |entry| assert_equal "test log entry #{i}", entry['textPayload'], batch assert_equal ZONE, entry['metadata']['zone'] assert_equal params['service_name'], entry['metadata']['serviceName'] check_labels entry, batch['commonLabels'], params['labels'] if (block_given?) yield(entry) end i += 1 assert i <= n, "Number of entries #{i} exceeds expected number #{n}" end end assert i == n, "Number of entries #{i} does not match expected number #{n}" end end