test/plugin/base_test.rb in fluent-plugin-google-cloud-0.5.6 vs test/plugin/base_test.rb in fluent-plugin-google-cloud-0.6.0.v2.alpha.1

- old
+ new

@@ -17,10 +17,12 @@ require 'mocha/test_unit' require 'webmock/test_unit' # Unit tests for Google Cloud Logging plugin module BaseTest + include Fluent::GoogleCloudOutput::Constants + def setup Fluent::Test.setup # delete environment variables that googleauth uses to find credentials. ENV.delete('GOOGLE_APPLICATION_CREDENTIALS') # service account env. @@ -38,10 +40,12 @@ end # generic attributes HOSTNAME = Socket.gethostname + WRITE_LOG_ENTRIES_URI = 'https://logging.googleapis.com/v2beta1/entries:write' + # attributes used for the GCE metadata service PROJECT_ID = 'test-project-id' ZONE = 'us-central1-b' FULLY_QUALIFIED_ZONE = 'projects/' + PROJECT_ID + '/zones/' + ZONE VM_ID = '9876543210' @@ -94,10 +98,25 @@ CLOUDFUNCTIONS_CLUSTER_NAME = 'cluster-1' CLOUDFUNCTIONS_NAMESPACE_NAME = 'default' CLOUDFUNCTIONS_POD_NAME = 'd.dc.myu.uc.functionp.pc.name-a.a1.987-c0l82' CLOUDFUNCTIONS_CONTAINER_NAME = 'worker' + # Dataflow specific labels + DATAFLOW_REGION = 'us-central1' + DATAFLOW_JOB_NAME = 'job_name_1' + DATAFLOW_JOB_ID = 'job_id_1' + DATAFLOW_STEP_ID = 'step_1' + DATAFLOW_TAG = 'dataflow.googleapis.com/worker' + + # ML specific labels + ML_REGION = 'us-central1' + ML_JOB_ID = 'job_name_1' + ML_TASK_NAME = 'task_name_1' + ML_TRIAL_ID = 'trial_id_1' + ML_LOG_AREA = 'log_area_1' + ML_TAG = 'master-replica-0' + # Parameters used for authentication AUTH_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer' FAKE_AUTH_TOKEN = 'abc123' # Information about test credentials files. @@ -169,147 +188,223 @@ CONFIG_EC2_PROJECT_ID_AND_CUSTOM_VM_ID = %( project_id #{EC2_PROJECT_ID} vm_id #{CUSTOM_VM_ID} ) - # Service configurations for various services - COMPUTE_SERVICE_NAME = 'compute.googleapis.com' - APPENGINE_SERVICE_NAME = 'appengine.googleapis.com' - CONTAINER_SERVICE_NAME = 'container.googleapis.com' - CLOUDFUNCTIONS_SERVICE_NAME = 'cloudfunctions.googleapis.com' - EC2_SERVICE_NAME = 'ec2.amazonaws.com' + CONFIG_DATAFLOW = %( + subservice_name "#{DATAFLOW_CONSTANTS[:service]}" + labels { + "#{DATAFLOW_CONSTANTS[:service]}/region" : "#{DATAFLOW_REGION}", + "#{DATAFLOW_CONSTANTS[:service]}/job_name" : "#{DATAFLOW_JOB_NAME}", + "#{DATAFLOW_CONSTANTS[:service]}/job_id" : "#{DATAFLOW_JOB_ID}" + } + label_map { "step": "#{DATAFLOW_CONSTANTS[:service]}/step_id" } + ) + CONFIG_ML = %( + subservice_name "#{ML_CONSTANTS[:service]}" + labels { + "#{ML_CONSTANTS[:service]}/job_id" : "#{ML_JOB_ID}", + "#{ML_CONSTANTS[:service]}/task_name" : "#{ML_TASK_NAME}", + "#{ML_CONSTANTS[:service]}/trial_id" : "#{ML_TRIAL_ID}" + } + label_map { "name": "#{ML_CONSTANTS[:service]}/job_id/log_area" } + ) + + # Service configurations for various services COMPUTE_PARAMS = { - service_name: COMPUTE_SERVICE_NAME, + resource: { + type: COMPUTE_CONSTANTS[:resource_type], + labels: { + 'instance_id' => VM_ID, + 'zone' => ZONE + } + }, log_name: 'test', project_id: PROJECT_ID, - zone: ZONE, labels: { - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME } } VMENGINE_PARAMS = { - service_name: APPENGINE_SERVICE_NAME, - log_name: "#{APPENGINE_SERVICE_NAME}%2Ftest", + resource: { + type: APPENGINE_CONSTANTS[:resource_type], + labels: { + 'module_id' => MANAGED_VM_BACKEND_NAME, + 'version_id' => MANAGED_VM_BACKEND_VERSION + } + }, + log_name: "#{APPENGINE_CONSTANTS[:service]}%2Ftest", project_id: PROJECT_ID, - zone: ZONE, labels: { - "#{APPENGINE_SERVICE_NAME}/module_id" => MANAGED_VM_BACKEND_NAME, - "#{APPENGINE_SERVICE_NAME}/version_id" => MANAGED_VM_BACKEND_VERSION, - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME + "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID, + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME, + "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE } } CONTAINER_TAG = "kubernetes.#{CONTAINER_POD_NAME}_" \ "#{CONTAINER_NAMESPACE_NAME}_#{CONTAINER_CONTAINER_NAME}" CONTAINER_FROM_METADATA_PARAMS = { - service_name: CONTAINER_SERVICE_NAME, + resource: { + type: CONTAINER_CONSTANTS[:resource_type], + labels: { + 'cluster_name' => CONTAINER_CLUSTER_NAME, + 'namespace_id' => CONTAINER_NAMESPACE_ID, + 'instance_id' => VM_ID, + 'pod_id' => CONTAINER_POD_ID, + 'container_name' => CONTAINER_CONTAINER_NAME, + 'zone' => ZONE + } + }, log_name: CONTAINER_CONTAINER_NAME, project_id: PROJECT_ID, - zone: ZONE, labels: { - "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID, - "#{CONTAINER_SERVICE_NAME}/cluster_name" => CONTAINER_CLUSTER_NAME, - "#{CONTAINER_SERVICE_NAME}/namespace_name" => CONTAINER_NAMESPACE_NAME, - "#{CONTAINER_SERVICE_NAME}/namespace_id" => CONTAINER_NAMESPACE_ID, - "#{CONTAINER_SERVICE_NAME}/pod_name" => CONTAINER_POD_NAME, - "#{CONTAINER_SERVICE_NAME}/pod_id" => CONTAINER_POD_ID, - "#{CONTAINER_SERVICE_NAME}/container_name" => CONTAINER_CONTAINER_NAME, - "#{CONTAINER_SERVICE_NAME}/stream" => CONTAINER_STREAM, + "#{CONTAINER_CONSTANTS[:service]}/namespace_name" => + CONTAINER_NAMESPACE_NAME, + "#{CONTAINER_CONSTANTS[:service]}/pod_name" => CONTAINER_POD_NAME, + "#{CONTAINER_CONSTANTS[:service]}/stream" => CONTAINER_STREAM, "label/#{CONTAINER_LABEL_KEY}" => CONTAINER_LABEL_VALUE, - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME } } # Almost the same as from metadata, but missing namespace_id and pod_id. CONTAINER_FROM_TAG_PARAMS = { - service_name: CONTAINER_SERVICE_NAME, + resource: { + type: CONTAINER_CONSTANTS[:resource_type], + labels: { + 'cluster_name' => CONTAINER_CLUSTER_NAME, + 'instance_id' => VM_ID, + 'container_name' => CONTAINER_CONTAINER_NAME, + 'zone' => ZONE + } + }, log_name: CONTAINER_CONTAINER_NAME, project_id: PROJECT_ID, - zone: ZONE, labels: { - "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID, - "#{CONTAINER_SERVICE_NAME}/cluster_name" => CONTAINER_CLUSTER_NAME, - "#{CONTAINER_SERVICE_NAME}/namespace_name" => CONTAINER_NAMESPACE_NAME, - "#{CONTAINER_SERVICE_NAME}/pod_name" => CONTAINER_POD_NAME, - "#{CONTAINER_SERVICE_NAME}/container_name" => CONTAINER_CONTAINER_NAME, - "#{CONTAINER_SERVICE_NAME}/stream" => CONTAINER_STREAM, - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME + "#{CONTAINER_CONSTANTS[:service]}/namespace_name" => + CONTAINER_NAMESPACE_NAME, + "#{CONTAINER_CONSTANTS[:service]}/pod_name" => CONTAINER_POD_NAME, + "#{CONTAINER_CONSTANTS[:service]}/stream" => CONTAINER_STREAM, + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME } } CLOUDFUNCTIONS_TAG = "kubernetes.#{CLOUDFUNCTIONS_POD_NAME}_" \ "#{CLOUDFUNCTIONS_NAMESPACE_NAME}_" \ "#{CLOUDFUNCTIONS_CONTAINER_NAME}" CLOUDFUNCTIONS_PARAMS = { - service_name: CLOUDFUNCTIONS_SERVICE_NAME, + resource: { + type: CLOUDFUNCTIONS_CONSTANTS[:resource_type], + labels: { + 'function_name' => CLOUDFUNCTIONS_FUNCTION_NAME, + 'region' => CLOUDFUNCTIONS_REGION + } + }, log_name: 'cloud-functions', project_id: PROJECT_ID, - zone: ZONE, labels: { 'execution_id' => CLOUDFUNCTIONS_EXECUTION_ID, - "#{CLOUDFUNCTIONS_SERVICE_NAME}/function_name" => - CLOUDFUNCTIONS_FUNCTION_NAME, - "#{CLOUDFUNCTIONS_SERVICE_NAME}/region" => CLOUDFUNCTIONS_REGION, - "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID, - "#{CONTAINER_SERVICE_NAME}/cluster_name" => CLOUDFUNCTIONS_CLUSTER_NAME, - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME + "#{CONTAINER_CONSTANTS[:service]}/instance_id" => VM_ID, + "#{CONTAINER_CONSTANTS[:service]}/cluster_name" => + CLOUDFUNCTIONS_CLUSTER_NAME, + "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID, + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME, + "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE } } CLOUDFUNCTIONS_TEXT_NOT_MATCHED_PARAMS = { - service_name: CLOUDFUNCTIONS_SERVICE_NAME, + resource: { + type: CLOUDFUNCTIONS_CONSTANTS[:resource_type], + labels: { + 'function_name' => CLOUDFUNCTIONS_FUNCTION_NAME, + 'region' => CLOUDFUNCTIONS_REGION + } + }, log_name: 'cloud-functions', project_id: PROJECT_ID, - zone: ZONE, labels: { - "#{CLOUDFUNCTIONS_SERVICE_NAME}/function_name" => - CLOUDFUNCTIONS_FUNCTION_NAME, - "#{CLOUDFUNCTIONS_SERVICE_NAME}/region" => CLOUDFUNCTIONS_REGION, - "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID, - "#{CONTAINER_SERVICE_NAME}/cluster_name" => CLOUDFUNCTIONS_CLUSTER_NAME, - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME + "#{CONTAINER_CONSTANTS[:service]}/instance_id" => VM_ID, + "#{CONTAINER_CONSTANTS[:service]}/cluster_name" => + CLOUDFUNCTIONS_CLUSTER_NAME, + "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID, + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME, + "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE } } + DATAFLOW_PARAMS = { + resource: { + type: DATAFLOW_CONSTANTS[:resource_type], + labels: { + 'job_name' => DATAFLOW_JOB_NAME, + 'job_id' => DATAFLOW_JOB_ID, + 'step_id' => DATAFLOW_STEP_ID, + 'region' => DATAFLOW_REGION + } + }, + log_name: DATAFLOW_TAG, + project_id: PROJECT_ID, + labels: { + "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID, + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME, + "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE + } + } + + ML_PARAMS = { + resource: { + type: ML_CONSTANTS[:resource_type], + labels: { + 'job_id' => ML_JOB_ID, + 'task_name' => ML_TASK_NAME + } + }, + log_name: ML_TAG, + project_id: PROJECT_ID, + labels: { + "#{ML_CONSTANTS[:service]}/trial_id" => ML_TRIAL_ID, + "#{ML_CONSTANTS[:service]}/job_id/log_area" => ML_LOG_AREA, + "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID, + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME, + "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE + } + } + CUSTOM_PARAMS = { - service_name: COMPUTE_SERVICE_NAME, + resource: { + type: COMPUTE_CONSTANTS[:resource_type], + labels: { + 'instance_id' => CUSTOM_VM_ID, + 'zone' => CUSTOM_ZONE + } + }, log_name: 'test', project_id: CUSTOM_PROJECT_ID, - zone: CUSTOM_ZONE, labels: { - "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance', - "#{COMPUTE_SERVICE_NAME}/resource_id" => CUSTOM_VM_ID, - "#{COMPUTE_SERVICE_NAME}/resource_name" => CUSTOM_HOSTNAME + "#{COMPUTE_CONSTANTS[:service]}/resource_name" => CUSTOM_HOSTNAME } } EC2_PARAMS = { - service_name: EC2_SERVICE_NAME, + resource: { + type: EC2_CONSTANTS[:resource_type], + labels: { + 'instance_id' => EC2_VM_ID, + 'region' => EC2_PREFIXED_ZONE, + 'aws_account' => EC2_ACCOUNT_ID + } + }, log_name: 'test', project_id: EC2_PROJECT_ID, - zone: EC2_PREFIXED_ZONE, labels: { - "#{EC2_SERVICE_NAME}/resource_type" => 'instance', - "#{EC2_SERVICE_NAME}/resource_id" => EC2_VM_ID, - "#{EC2_SERVICE_NAME}/account_id" => EC2_ACCOUNT_ID, - "#{EC2_SERVICE_NAME}/resource_name" => HOSTNAME + "#{EC2_CONSTANTS[:service]}/resource_name" => HOSTNAME } } HTTP_REQUEST_MESSAGE = { 'requestMethod' => 'POST', @@ -432,16 +527,16 @@ assert_equal false, d.instance.running_on_managed_vm end def test_gce_used_when_detect_subservice_is_false setup_gce_metadata_stubs - # This would cause the service to be container.googleapis.com if not for the - # detect_subservice=false config. + # This would cause the resource type to be container.googleapis.com if not + # for the detect_subservice=false config. setup_container_metadata_stubs d = create_driver(NO_DETECT_SUBSERVICE_CONFIG) d.run - assert_equal COMPUTE_SERVICE_NAME, d.instance.service_name + assert_equal COMPUTE_CONSTANTS[:resource_type], d.instance.resource.type end def test_metadata_overrides { # In this case we are overriding all configured parameters so we should @@ -552,29 +647,29 @@ d.run end verify_log_entries(1, EC2_PARAMS) end - def test_struct_payload_log + def test_structured_payload_log setup_gce_metadata_stubs setup_logging_stubs do d = create_driver d.emit('msg' => log_entry(0), 'tag2' => 'test', 'data' => 5000, 'some_null_field' => nil) d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 4, fields.size, entry assert_equal 'test log entry 0', get_string(fields['msg']), entry assert_equal 'test', get_string(fields['tag2']), entry assert_equal 5000, get_number(fields['data']), entry assert_equal null_value, fields['some_null_field'], entry end end - def test_struct_payload_malformatted_log + def test_structured_payload_malformatted_log setup_gce_metadata_stubs message = 'test message' setup_logging_stubs do d = create_driver d.emit( @@ -586,12 +681,12 @@ 'symbol_key' => { some_symbol: message }, 'nil_key' => { nil => message } ) d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 7, fields.size, entry assert_equal message, get_string(get_fields(get_struct(fields \ ['int_key']))['1']), entry assert_equal message, get_string(get_fields(get_struct(fields \ ['int_array_key']))['[1, 2, 3, 4]']), entry @@ -606,11 +701,11 @@ assert_equal message, get_string(get_fields(get_struct(fields \ ['nil_key']))['']), entry end end - def test_struct_payload_json_log + def test_structured_payload_json_log setup_gce_metadata_stubs setup_logging_stubs do d = create_driver json_string = '{"msg": "test log entry 0", "tag2": "test", "data": 5000}' d.emit('message' => 'notJSON ' + json_string) @@ -622,11 +717,11 @@ verify_log_entries(4, COMPUTE_PARAMS, '') do |entry| assert entry.key?('textPayload'), 'Entry did not have textPayload' end end - def test_struct_payload_json_container_log + def test_structured_payload_json_container_log setup_gce_metadata_stubs setup_container_metadata_stubs setup_logging_stubs do d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) json_string = '{"msg": "test log entry 0", "tag2": "test", ' \ @@ -641,12 +736,12 @@ 3, CONTAINER_FROM_METADATA_PARAMS, '') do |entry| log_index += 1 if log_index == 1 assert entry.key?('textPayload'), 'Entry did not have textPayload' else - assert entry.key?('structPayload'), 'Entry did not have structPayload' - fields = get_fields(entry['structPayload']) + assert entry.key?('jsonPayload'), 'Entry did not have jsonPayload' + fields = get_fields(entry['jsonPayload']) assert_equal 4, fields.size, entry assert_equal 'test log entry 0', get_string(fields['msg']), entry assert_equal 'test', get_string(fields['tag2']), entry assert_equal 5000, get_number(fields['data']), entry assert_equal null_value, fields['some_null_field'], entry @@ -663,11 +758,11 @@ @logs_sent = [] d = create_driver(REQUIRE_VALID_TAGS_CONFIG, tag) d.emit('msg' => log_entry(0)) d.run end - verify_log_entries(0, COMPUTE_PARAMS, 'structPayload') + verify_log_entries(0, COMPUTE_PARAMS, 'jsonPayload') end end # Verify that empty string container name should fail the kubernetes regex # match, thus the original tag is used as the log name. @@ -676,22 +771,21 @@ setup_container_metadata_stubs container_name = '' # This tag will not match the kubernetes regex because it requires a # non-empty container name. tag = container_tag_with_container_name(container_name) - params = CONTAINER_FROM_METADATA_PARAMS.merge( - labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge( - "#{CONTAINER_SERVICE_NAME}/container_name" => container_name), - log_name: tag) - setup_logging_stubs([params]) do - @logs_sent = [] + setup_logging_stubs do d = create_driver(REQUIRE_VALID_TAGS_CONFIG, tag) d.emit(container_log_entry_with_metadata(log_entry(0), container_name)) d.run end + params = CONTAINER_FROM_METADATA_PARAMS.merge( + resource: CONTAINER_FROM_METADATA_PARAMS[:resource].merge( + labels: CONTAINER_FROM_METADATA_PARAMS[:resource][:labels].merge( + 'container_name' => container_name)), + log_name: tag) verify_log_entries(1, params, 'textPayload') - assert_equal "projects/#{PROJECT_ID}/logs/#{tag}", @logs_sent[0]['logName'] end # Verify that container names with non-utf8 characters should be rejected when # 'require_valid_tags' is true. def test_reject_non_utf8_container_name_with_require_valid_tags_true @@ -699,80 +793,77 @@ setup_container_metadata_stubs non_utf8_tags = INVALID_TAGS.select do |tag, _| tag.is_a?(String) && !tag.empty? end non_utf8_tags.each do |container_name, encoded_name| - params = CONTAINER_FROM_METADATA_PARAMS.merge( - labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge( - "#{CONTAINER_SERVICE_NAME}/container_name" => - URI.decode(encoded_name)), - log_name: encoded_name) - setup_logging_stubs([params]) do + setup_logging_stubs do @logs_sent = [] d = create_driver(REQUIRE_VALID_TAGS_CONFIG, container_tag_with_container_name(container_name)) d.emit(container_log_entry_with_metadata(log_entry(0), container_name)) d.run end + params = CONTAINER_FROM_METADATA_PARAMS.merge( + labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge( + "#{CONTAINER_CONSTANTS[:service]}/container_name" => + URI.decode(encoded_name)), + log_name: encoded_name) verify_log_entries(0, params, 'textPayload') end end # Verify that tags are properly encoded. When 'require_valid_tags' is true, we # only accept string tags with utf8 characters. def test_encode_tags_with_require_valid_tags_true setup_gce_metadata_stubs VALID_TAGS.each do |tag, encoded_tag| - setup_logging_stubs([COMPUTE_PARAMS.merge(log_name: encoded_tag)]) do + setup_logging_stubs do @logs_sent = [] d = create_driver(REQUIRE_VALID_TAGS_CONFIG, tag) d.emit('msg' => log_entry(0)) d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') - assert_equal "projects/#{PROJECT_ID}/logs/#{encoded_tag}", - @logs_sent[0]['logName'] + verify_log_entries(1, COMPUTE_PARAMS.merge(log_name: encoded_tag), + 'jsonPayload') end end # Verify that tags extracted from container names are properly encoded. def test_encode_tags_from_container_name_with_require_valid_tags_true setup_gce_metadata_stubs setup_container_metadata_stubs VALID_TAGS.each do |tag, encoded_tag| - params = CONTAINER_FROM_METADATA_PARAMS.merge( - labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge( - "#{CONTAINER_SERVICE_NAME}/container_name" => tag), - log_name: encoded_tag) - setup_logging_stubs([params]) do + setup_logging_stubs do @logs_sent = [] d = create_driver(REQUIRE_VALID_TAGS_CONFIG, container_tag_with_container_name(tag)) d.emit(container_log_entry_with_metadata(log_entry(0), tag)) d.run end + params = CONTAINER_FROM_METADATA_PARAMS.merge( + resource: CONTAINER_FROM_METADATA_PARAMS[:resource].merge( + labels: CONTAINER_FROM_METADATA_PARAMS[:resource][:labels].merge( + 'container_name' => tag)), + log_name: encoded_tag) verify_log_entries(1, params, 'textPayload') - assert_equal "projects/#{PROJECT_ID}/logs/#{encoded_tag}", - @logs_sent[0]['logName'] end end # Verify that tags are properly encoded and sanitized. When # 'require_valid_tags' is false, we try to convert any non-string tags to # strings, and replace non-utf8 characters with a replacement string. def test_sanitize_tags_with_require_valid_tags_false setup_gce_metadata_stubs ALL_TAGS.each do |tag, sanitized_tag| - setup_logging_stubs([COMPUTE_PARAMS.merge(log_name: sanitized_tag)]) do + setup_logging_stubs do @logs_sent = [] d = create_driver(APPLICATION_DEFAULT_CONFIG, tag) d.emit('msg' => log_entry(0)) d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') - assert_equal "projects/#{PROJECT_ID}/logs/#{sanitized_tag}", - @logs_sent[0]['logName'] + verify_log_entries(1, COMPUTE_PARAMS.merge(log_name: sanitized_tag), + 'jsonPayload') end end # Verify that tags extracted from container names are properly encoded and # sanitized. @@ -785,25 +876,23 @@ # non-empty string cases here. string_tags = ALL_TAGS.select { |tag, _| tag.is_a?(String) && !tag.empty? } string_tags.each do |container_name, encoded_container_name| # Container name in the label is sanitized but not encoded, while the log # name is encoded. - params = CONTAINER_FROM_METADATA_PARAMS.merge( - labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge( - "#{CONTAINER_SERVICE_NAME}/container_name" => - URI.decode(encoded_container_name)), - log_name: encoded_container_name) - setup_logging_stubs([params]) do + setup_logging_stubs do @logs_sent = [] d = create_driver(APPLICATION_DEFAULT_CONFIG, container_tag_with_container_name(container_name)) d.emit(container_log_entry_with_metadata(log_entry(0), container_name)) d.run end + params = CONTAINER_FROM_METADATA_PARAMS.merge( + resource: CONTAINER_FROM_METADATA_PARAMS[:resource].merge( + labels: CONTAINER_FROM_METADATA_PARAMS[:resource][:labels].merge( + 'container_name' => URI.decode(encoded_container_name))), + log_name: encoded_container_name) verify_log_entries(1, params, 'textPayload') - assert_equal "projects/#{PROJECT_ID}/logs/#{encoded_container_name}", - @logs_sent[0]['logName'] end end def test_timestamps setup_gce_metadata_stubs @@ -831,36 +920,36 @@ d.run end end verify_index = 0 verify_log_entries(emit_index, COMPUTE_PARAMS) do |entry| - assert_equal_with_default entry['metadata']['timestamp']['seconds'], + assert_equal_with_default entry['timestamp']['seconds'], expected_ts[verify_index].tv_sec, 0, entry - assert_equal_with_default entry['metadata']['timestamp']['nanos'], + assert_equal_with_default entry['timestamp']['nanos'], expected_ts[verify_index].tv_nsec, 0, entry do # Fluentd v0.14 onwards supports nanosecond timestamp values. # Added in 600 ns delta to avoid flaky tests introduced # due to rounding error in double-precision floating-point numbers # (to account for the missing 9 bits of precision ~ 512 ns). # See http://wikipedia.org/wiki/Double-precision_floating-point_format assert_in_delta expected_ts[verify_index].tv_nsec, - entry['metadata']['timestamp']['nanos'], 600, entry + entry['timestamp']['nanos'], 600, entry end verify_index += 1 end end def test_malformed_timestamp setup_gce_metadata_stubs setup_logging_stubs do d = create_driver - # if timestamp is not a hash it is passed through to the struct payload. + # if timestamp is not a hash it is passed through to the json payload. d.emit('message' => log_entry(0), 'timestamp' => 'not-a-hash') d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 2, fields.size, entry assert_equal 'not-a-hash', get_string(fields['timestamp']), entry end end @@ -935,11 +1024,11 @@ "label_number_two": "foo.googleapis.com/bar", "label3": "label3" } ) d = create_driver(config) - # not_a_label passes through to the struct payload + # not_a_label passes through to the json payload d.emit('message' => log_entry(0), 'label1' => 'value1', 'label_number_two' => 'value2', 'not_a_label' => 'value4', 'label3' => 'value3') @@ -948,12 +1037,12 @@ # make a deep copy of COMPUTE_PARAMS and add the parsed labels. params = Marshal.load(Marshal.dump(COMPUTE_PARAMS)) params[:labels]['sent_label_1'] = 'value1' params[:labels]['foo.googleapis.com/bar'] = 'value2' params[:labels]['label3'] = 'value3' - verify_log_entries(1, params, 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + verify_log_entries(1, params, 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 2, fields.size, entry assert_equal 'test log entry 0', get_string(fields['message']), entry assert_equal 'value4', get_string(fields['not_a_label']), entry end end @@ -1021,15 +1110,13 @@ d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) d.emit(container_log_entry_with_metadata(log_entry(0))) d.run end verify_log_entries(1, CONTAINER_FROM_METADATA_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal CONTAINER_SEVERITY, entry['severity'], entry end end def test_multiple_container_logs_metadata_from_plugin setup_gce_metadata_stubs @@ -1043,15 +1130,14 @@ d.instance_variable_get('@entries').clear n.times { |i| d.emit(container_log_entry_with_metadata(log_entry(i))) } d.run end verify_log_entries(n, CONTAINER_FROM_METADATA_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], + entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal CONTAINER_SEVERITY, entry['severity'], entry end end end def test_multiple_container_logs_metadata_from_tag @@ -1066,15 +1152,14 @@ d.instance_variable_get('@entries').clear n.times { |i| d.emit(container_log_entry(log_entry(i))) } d.run end verify_log_entries(n, CONTAINER_FROM_TAG_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], + entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal CONTAINER_SEVERITY, entry['severity'], entry end end end def test_one_container_log_metadata_from_tag @@ -1084,15 +1169,13 @@ d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) d.emit(container_log_entry(log_entry(0))) d.run end verify_log_entries(1, CONTAINER_FROM_TAG_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal CONTAINER_SEVERITY, entry['severity'], entry end end def test_one_container_log_from_tag_stderr setup_gce_metadata_stubs @@ -1101,68 +1184,62 @@ d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) d.emit(container_log_entry(log_entry(0), 'stderr')) d.run end expected_params = CONTAINER_FROM_TAG_PARAMS.merge( - labels: { "#{CONTAINER_SERVICE_NAME}/stream" => 'stderr' } + labels: { "#{CONTAINER_CONSTANTS[:service]}/stream" => 'stderr' } ) { |_, oldval, newval| oldval.merge(newval) } verify_log_entries(1, expected_params) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal 'ERROR', entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal 'ERROR', entry['severity'], entry end end - def test_struct_container_log_metadata_from_plugin + def test_json_container_log_metadata_from_plugin setup_gce_metadata_stubs setup_container_metadata_stubs setup_logging_stubs do d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) d.emit(container_log_entry_with_metadata('{"msg": "test log entry 0", ' \ '"tag2": "test", "data": ' \ '5000, "severity": "WARNING"}')) d.run end verify_log_entries(1, CONTAINER_FROM_METADATA_PARAMS, - 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 3, fields.size, entry assert_equal 'test log entry 0', get_string(fields['msg']), entry assert_equal 'test', get_string(fields['tag2']), entry assert_equal 5000, get_number(fields['data']), entry - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal 'WARNING', entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal 'WARNING', entry['severity'], entry end end - def test_struct_container_log_metadata_from_tag + def test_json_container_log_metadata_from_tag setup_gce_metadata_stubs setup_container_metadata_stubs setup_logging_stubs do d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) d.emit(container_log_entry('{"msg": "test log entry 0", ' \ '"tag2": "test", "data": 5000, ' \ '"severity": "W"}')) d.run end verify_log_entries(1, CONTAINER_FROM_TAG_PARAMS, - 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 3, fields.size, entry assert_equal 'test log entry 0', get_string(fields['msg']), entry assert_equal 'test', get_string(fields['tag2']), entry assert_equal 5000, get_number(fields['data']), entry - assert_equal CONTAINER_SECONDS_EPOCH, \ - entry['metadata']['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, \ - entry['metadata']['timestamp']['nanos'], entry - assert_equal 'WARNING', entry['metadata']['severity'], entry + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal 'WARNING', entry['severity'], entry end end def test_cloudfunctions_log setup_gce_metadata_stubs @@ -1176,11 +1253,11 @@ @logs_sent = [] n.times { |i| d.emit(cloudfunctions_log_entry(i)) } d.run end verify_log_entries(n, CLOUDFUNCTIONS_PARAMS) do |entry| - assert_equal 'DEBUG', entry['metadata']['severity'], + assert_equal 'DEBUG', entry['severity'], "Test with #{n} logs failed. \n#{entry}" end end end @@ -1197,11 +1274,11 @@ n.times { |i| d.emit(cloudfunctions_log_entry_text_not_matched(i)) } d.run end verify_log_entries( n, CLOUDFUNCTIONS_TEXT_NOT_MATCHED_PARAMS) do |entry| - assert_equal 'INFO', entry['metadata']['severity'], + assert_equal 'INFO', entry['severity'], "Test with #{n} logs failed. \n#{entry}" end end end @@ -1230,30 +1307,30 @@ def test_http_request_from_record setup_gce_metadata_stubs setup_logging_stubs do d = create_driver - d.emit('httpRequest' => http_request_message) + d.emit('httpRequest' => HTTP_REQUEST_MESSAGE) d.run end verify_log_entries(1, COMPUTE_PARAMS, 'httpRequest') do |entry| - assert_equal http_request_message, entry['httpRequest'], entry - assert_nil get_fields(entry['structPayload'])['httpRequest'], entry + assert_equal HTTP_REQUEST_MESSAGE, entry['httpRequest'], entry + assert_nil get_fields(entry['jsonPayload'])['httpRequest'], entry end end def test_http_request_partial_from_record setup_gce_metadata_stubs setup_logging_stubs do d = create_driver - d.emit('httpRequest' => http_request_message.merge( + d.emit('httpRequest' => HTTP_REQUEST_MESSAGE.merge( 'otherKey' => 'value')) d.run end verify_log_entries(1, COMPUTE_PARAMS, 'httpRequest') do |entry| - assert_equal http_request_message, entry['httpRequest'], entry - fields = get_fields(entry['structPayload']) + assert_equal HTTP_REQUEST_MESSAGE, entry['httpRequest'], entry + fields = get_fields(entry['jsonPayload']) request = get_fields(get_struct(fields['httpRequest'])) assert_equal 'value', get_string(request['otherKey']), entry end end @@ -1262,24 +1339,19 @@ setup_logging_stubs do d = create_driver d.emit('httpRequest' => 'a_string') d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry| - fields = get_fields(entry['structPayload']) + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) assert_equal 'a_string', get_string(fields['httpRequest']), entry assert_nil entry['httpRequest'], entry end end private - def uri_for_log(params) - 'https://logging.googleapis.com/v1beta3/projects/' + params[:project_id] + - '/logs/' + params[:log_name] + '/entries:write' - end - def stub_metadata_request(metadata_path, response_body) stub_request(:get, 'http://169.254.169.254/computeMetadata/v1/' + metadata_path) .to_return(body: response_body, status: 200, headers: { 'Content-Length' => response_body.length }) @@ -1419,62 +1491,80 @@ stream: 'stdout', log: log_entry(i) } end + def dataflow_log_entry(i) + { + step: DATAFLOW_STEP_ID, + message: log_entry(i) + } + end + + def ml_log_entry(i) + { + name: ML_LOG_AREA, + message: log_entry(i) + } + end + def log_entry(i) 'test log entry ' + i.to_s end - def check_labels(entry, common_labels, expected_labels) - # TODO(salty) test/handle overlap between common_labels and entry labels - all_labels ||= common_labels - all_labels.merge!(entry['metadata']['labels'] || {}) - all_labels.each do |key, value| + def check_labels(labels, expected_labels) + labels.each do |key, value| assert value.is_a?(String), "Value #{value} for label #{key} " \ 'is not a string: ' + value.class.name assert expected_labels.key?(key), "Unexpected label #{key} => #{value}" assert_equal expected_labels[key], value, 'Value mismatch - expected ' \ "#{expected_labels[key]} in #{key} => #{value}" end - assert_equal expected_labels.length, all_labels.length, 'Expected ' \ - "#{expected_labels.length} labels, got #{all_labels.length}" + assert_equal expected_labels.length, labels.length, 'Expected ' \ + "#{expected_labels.length} labels: #{expected_labels}, got " \ + "#{labels.length} labels: #{labels}" end # The caller can optionally provide a block which is called for each entry. def verify_json_log_entries(n, params, payload_type = 'textPayload') i = 0 - @logs_sent.each do |batch| - batch['entries'].each do |entry| + @logs_sent.each do |request| + request['entries'].each do |entry| unless payload_type.empty? assert entry.key?(payload_type), 'Entry did not contain expected ' \ "#{payload_type} key: " + entry.to_s # Check the payload for textPayload, otherwise it's up to the caller. if payload_type == 'textPayload' - assert_equal "test log entry #{i}", entry['textPayload'], batch + assert_equal "test log entry #{i}", entry['textPayload'], request end end - assert_equal params[:zone], entry['metadata']['zone'] - assert_equal params[:service_name], entry['metadata']['serviceName'] - check_labels entry, batch['commonLabels'], params[:labels] + # per-entry resource or log_name overrides the corresponding field + # from the request. Labels are merged, with the per-entry label + # taking precedence in case of overlap. + resource = entry['resource'] || request['resource'] + log_name = entry['logName'] || request['logName'] + + labels ||= request['labels'] + labels.merge!(entry['labels'] || {}) + + assert_equal \ + "projects/#{params[:project_id]}/logs/#{params[:log_name]}", log_name + assert_equal params[:resource][:type], resource['type'] + check_labels resource['labels'], params[:resource][:labels] + check_labels labels, params[:labels] yield(entry) if block_given? i += 1 assert i <= n, "Number of entries #{i} exceeds expected number #{n}" end end assert i == n, "Number of entries #{i} does not match expected number #{n}" end - # The http request message to test against. - def http_request_message - HTTP_REQUEST_MESSAGE - end - # Replace the 'referer' field with nil. def http_request_message_with_nil_referer - http_request_message.merge('referer' => nil) + HTTP_REQUEST_MESSAGE.merge('referer' => nil) end # This module expects the methods below to be overridden. # Create a Fluentd output test driver with the Google Cloud Output plugin. @@ -1501,11 +1591,11 @@ # a plain equal. e.g. assert_in_delta. def assert_equal_with_default(_field, _expected_value, _default_value, _entry) _undefined end - # Get the fields of the struct payload. - def get_fields(_struct_payload) + # Get the fields of the payload. + def get_fields(_payload) _undefined end # Get the value of a struct field. def get_struct(_field)