test/plugin/base_test.rb in fluent-plugin-google-cloud-0.6.3 vs test/plugin/base_test.rb in fluent-plugin-google-cloud-0.6.4.pre.1

- old
+ new

@@ -97,34 +97,36 @@ d = create_driver d.run assert_equal PROJECT_ID, d.instance.project_id assert_equal ZONE, d.instance.zone assert_equal VM_ID, d.instance.vm_id - assert_equal false, d.instance.running_on_managed_vm + assert_equal COMPUTE_CONSTANTS[:resource_type], d.instance.resource.type end def test_managed_vm_metadata_loading setup_gce_metadata_stubs setup_managed_vm_metadata_stubs d = create_driver d.run assert_equal PROJECT_ID, d.instance.project_id assert_equal ZONE, d.instance.zone assert_equal VM_ID, d.instance.vm_id - assert_equal true, d.instance.running_on_managed_vm - assert_equal MANAGED_VM_BACKEND_NAME, d.instance.gae_backend_name - assert_equal MANAGED_VM_BACKEND_VERSION, d.instance.gae_backend_version + assert_equal APPENGINE_CONSTANTS[:resource_type], d.instance.resource.type + assert_equal MANAGED_VM_BACKEND_NAME, + d.instance.resource.labels['module_id'] + assert_equal MANAGED_VM_BACKEND_VERSION, + d.instance.resource.labels['version_id'] end def test_gce_metadata_does_not_load_when_use_metadata_service_is_false Fluent::GoogleCloudOutput.any_instance.expects(:fetch_metadata).never d = create_driver(NO_METADATA_SERVICE_CONFIG + CUSTOM_METADATA_CONFIG) d.run assert_equal CUSTOM_PROJECT_ID, d.instance.project_id assert_equal CUSTOM_ZONE, d.instance.zone assert_equal CUSTOM_VM_ID, d.instance.vm_id - assert_equal false, d.instance.running_on_managed_vm + assert_equal COMPUTE_CONSTANTS[:resource_type], d.instance.resource.type end def test_gce_used_when_detect_subservice_is_false setup_gce_metadata_stubs # This would cause the resource type to be container.googleapis.com if not @@ -154,12 +156,10 @@ d = create_driver(config) d.run assert_equal parts[1], d.instance.project_id, "Index #{index} failed." assert_equal parts[2], d.instance.zone, "Index #{index} failed." assert_equal parts[3], d.instance.vm_id, "Index #{index} failed." - assert_equal false, d.instance.running_on_managed_vm, - "Index #{index} failed." end end def test_ec2_metadata_requires_project_id setup_ec2_metadata_stubs @@ -699,84 +699,23 @@ end verify_log_entries(n, VMENGINE_PARAMS) end end - def test_one_container_log_metadata_from_plugin - setup_gce_metadata_stubs - setup_container_metadata_stubs - setup_logging_stubs do - d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) - d.emit(container_log_entry_with_metadata(log_entry(0))) - d.run - end - verify_log_entries(1, CONTAINER_FROM_METADATA_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['severity'], entry - end + # Test container logs when metadata is extracted from the 'kubernetes' field + # in the log record. + def test_container_logs_metadata_from_record + verify_container_logs(method(:container_log_entry_with_metadata), + CONTAINER_FROM_METADATA_PARAMS) end - def test_multiple_container_logs_metadata_from_plugin - setup_gce_metadata_stubs - setup_container_metadata_stubs - [2, 3, 5, 11, 50].each do |n| - @logs_sent = [] - setup_logging_stubs do - d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) - # The test driver doesn't clear its buffer of entries after running, so - # do it manually here. - d.instance_variable_get('@entries').clear - n.times { |i| d.emit(container_log_entry_with_metadata(log_entry(i))) } - d.run - end - verify_log_entries(n, CONTAINER_FROM_METADATA_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], - entry - assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['severity'], entry - end - end + # Test container logs when metadata is extracted from the tag. + def test_container_logs_metadata_from_tag + verify_container_logs(method(:container_log_entry), + CONTAINER_FROM_TAG_PARAMS) end - def test_multiple_container_logs_metadata_from_tag - setup_gce_metadata_stubs - setup_container_metadata_stubs - [2, 3, 5, 11, 50].each do |n| - @logs_sent = [] - setup_logging_stubs do - d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) - # The test driver doesn't clear its buffer of entries after running, so - # do it manually here. - d.instance_variable_get('@entries').clear - n.times { |i| d.emit(container_log_entry(log_entry(i))) } - d.run - end - verify_log_entries(n, CONTAINER_FROM_TAG_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], - entry - assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['severity'], entry - end - end - end - - def test_one_container_log_metadata_from_tag - setup_gce_metadata_stubs - setup_container_metadata_stubs - setup_logging_stubs do - d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) - d.emit(container_log_entry(log_entry(0))) - d.run - end - verify_log_entries(1, CONTAINER_FROM_TAG_PARAMS) do |entry| - assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry - assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry - assert_equal CONTAINER_SEVERITY, entry['severity'], entry - end - end - def test_one_container_log_from_tag_stderr setup_gce_metadata_stubs setup_container_metadata_stubs setup_logging_stubs do d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) @@ -837,10 +776,138 @@ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry assert_equal 'WARNING', entry['severity'], entry end end + # Docker Container. + + # Test textPayload logs from Docker container stderr / stdout. + def test_text_payload_docker_container_logs + setup_gce_metadata_stubs + [1, 2, 3, 5, 11, 50].each do |n| + @logs_sent = [] + setup_logging_stubs do + d = create_driver(APPLICATION_DEFAULT_CONFIG, DOCKER_CONTAINER_TAG) + # Generate log entries with 'log' (simply a string) and 'time' fields. + n.times { |i| d.emit(docker_container_log_entry(log_entry(i))) } + d.run + end + # 'container_id' and 'container_name' should have been extracted from tag + # and properly set in resource.labels and common metadata labels as in + # DOCKER_CONTAINER_PARAMS. + verify_log_entries(n, DOCKER_CONTAINER_PARAMS) do |entry| + # Timestamp in 'time' field from log entry should be set properly. + assert_equal DOCKER_CONTAINER_SECONDS_EPOCH, + entry['timestamp']['seconds'], entry + assert_equal DOCKER_CONTAINER_NANOS, entry['timestamp']['nanos'], + entry + # Severity is 'DEFAULT' because 'stream' info is absent from log entry. + assert_equal_with_default entry['severity'], 'DEFAULT', 'DEFAULT', entry + end + end + end + + # Test jsonPayload logs from Docker container stderr / stdout. + def test_json_payload_docker_container_logs + setup_gce_metadata_stubs + [1, 2, 3, 5, 11, 50].each do |n| + @logs_sent = [] + setup_logging_stubs do + d = create_driver(APPLICATION_DEFAULT_CONFIG, DOCKER_CONTAINER_TAG) + n.times do + # Generate log entries with 'log' (json) and 'time' fields. + d.emit(docker_container_log_entry('{"msg": "test log entry ' \ + "#{n}" \ + '", "tag2": "test", "data": ' \ + '5000, "severity": "WARNING"}')) + end + d.run + end + # 'container_id' and 'container_name' should have been extracted from tag + # and properly set in resource.labels and common metadata labels as in + # DOCKER_CONTAINER_PARAMS. + verify_log_entries(n, DOCKER_CONTAINER_PARAMS, 'jsonPayload') do |entry| + # 'log' field should be detected as json and parsed properly. + fields = get_fields(entry['jsonPayload']) + assert_equal 3, fields.size, entry + assert_equal "test log entry #{n}", get_string(fields['msg']), entry + assert_equal 'test', get_string(fields['tag2']), entry + assert_equal 5000, get_number(fields['data']), entry + # Timestamp in 'time' field from log entry should be set properly. + assert_equal DOCKER_CONTAINER_SECONDS_EPOCH, + entry['timestamp']['seconds'], entry + assert_equal DOCKER_CONTAINER_NANOS, entry['timestamp']['nanos'], entry + # Severity in 'severity' field from log entry should be set properly. + assert_equal 'WARNING', entry['severity'], entry + end + end + end + + # Test the following metadata is properly extracted from json record: + # * container_id + # * container_name + # * source (this field maps to the 'stream' metadata label. + def test_docker_container_logs_metadata_from_json_record + setup_gce_metadata_stubs + { + # 'stream' label should be extracted from json record when present. + docker_container_log_entry_with_metadata( + log_entry(0) + ) => { + # When the 'source' field from the json record has value 'stdout', + # 'severity' should be 'INFO'. + 'params' => DOCKER_CONTAINER_PARAMS_WITH_STREAM_STDOUT, + 'severity' => 'INFO' + }, + # 'container_id' and 'container_name' can be extracted from tag or + # json record. If present in both cases, values in the json record will + # overwrite values in tags. For example: + # DOCKER_CONTAINER_PARAMS_WITH_METADATA_OVERWRITTEN has different + # 'container_id' and 'container_name' values from DOCKER_CONTAINER_TAG, + # and we need to verify these two fields get overwritten as expected. + docker_container_log_entry_with_metadata( + log_entry(0), DOCKER_CONTAINER_ID2, DOCKER_CONTAINER_NAME2, + DOCKER_CONTAINER_STREAM_STDERR + ) => { + # When the 'source' field from the json record has value 'stderr', + # 'severity' should be 'ERROR'. + 'params' => DOCKER_CONTAINER_PARAMS_WITH_METADATA_OVERWRITTEN, + 'severity' => 'ERROR' + } + }.each do |log_entry, expected| + @logs_sent = [] + setup_logging_stubs do + d = create_driver(APPLICATION_DEFAULT_CONFIG, DOCKER_CONTAINER_TAG) + d.emit(log_entry) + d.run + end + verify_log_entries(1, expected['params']) do |entry| + assert_equal DOCKER_CONTAINER_SECONDS_EPOCH, + entry['timestamp']['seconds'], entry + assert_equal DOCKER_CONTAINER_NANOS, entry['timestamp']['nanos'], + entry + assert_equal_with_default entry['severity'], expected['severity'], + 'DEFAULT', entry + end + end + end + + # Test logs from applications running in Docker containers. + def test_docker_container_application_logs + setup_gce_metadata_stubs + setup_docker_remote_api_stubs + setup_logging_stubs do + # Metadata Agent is not enabled. Will call Docker Remote API for + # container info. + d = create_driver(APPLICATION_DEFAULT_CONFIG, + DOCKER_CONTAINER_TAG_WITH_APPLICATION) + d.emit('message' => log_entry(0)) + d.run + end + verify_log_entries(1, DOCKER_CONTAINER_WITH_APPLICATION_PARAMS) + end + def test_cloudfunctions_log setup_gce_metadata_stubs setup_cloudfunctions_metadata_stubs [1, 2, 3, 5, 11, 50].each do |n| setup_logging_stubs do @@ -901,10 +968,21 @@ i += 1 end end end + def test_dataproc_log + setup_gce_metadata_stubs + setup_dataproc_metadata_stubs + setup_logging_stubs do + d = create_driver + d.emit(dataproc_log_entry('test message')) + d.run + end + verify_log_entries(1, DATAPROC_PARAMS, 'jsonPayload') + end + def test_http_request_from_record setup_gce_metadata_stubs setup_logging_stubs do d = create_driver d.emit('httpRequest' => HTTP_REQUEST_MESSAGE) @@ -1001,10 +1079,76 @@ assert_nil get_fields(entry['jsonPayload'])['httpRequest'], entry end end end + # Metadata Agent related tests. + def test_configure_enable_metadata_agent_default_and_false + setup_gce_metadata_stubs + [create_driver, create_driver(DISABLE_METADATA_AGENT_CONFIG)].each do |d| + assert_false d.instance.instance_variable_get(:@enable_metadata_agent) + end + end + + def test_configure_enable_metadata_agent_true + setup_gce_metadata_stubs + setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY) + d = create_driver(ENABLE_METADATA_AGENT_CONFIG) + assert_true d.instance.instance_variable_get(:@enable_metadata_agent) + end + + # Test an implicit monitored resource can be retrieved from Metadata Agent + # with an empty string as the locally-unique id. + def test_retrieve_implicit_monitored_resource + # Minimum set up for gce metadata stubs. Stubs for 'vm_id' and 'location' + # info are deliberately not set up. + setup_gce_metadata_stubs_minimum + setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY) + setup_logging_stubs do + d = create_driver(ENABLE_METADATA_AGENT_CONFIG) + d.emit('message' => log_entry(0)) + d.run + end + verify_log_entries(1, COMPUTE_PARAMS) + end + + # Test a docker container monitored resource can be retrieved from Metadata + # Agent with "container.<container_id>" as the locally-unique id. + def test_retrieve_docker_container_monitored_resource + setup_gce_metadata_stubs_minimum + setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY) + setup_metadata_agent_stubs(DOCKER_CONSTANTS[:resource_type]) + setup_logging_stubs do + # Tag format here: "container.<container_id>.<container_name>" + d = create_driver(ENABLE_METADATA_AGENT_CONFIG, DOCKER_CONTAINER_TAG) + d.emit('message' => log_entry(0)) + d.run + end + # gce_metadata_stubs has ZONE1, while metadata agent stub has ZONE2. Here we + # need to verify ZONE2 overwrites ZONE1 as expected. + verify_log_entries(1, DOCKER_CONTAINER_PARAMS_WITH_ZONE2) + end + + # Test a docker container monitored resource can be retrieved from Metadata + # Agent with "container.<container_name>" as the locally-unique id when the + # log entry comes from an application running in the docker container. + def test_retrieve_docker_container_monitored_resource_with_application + setup_gce_metadata_stubs_minimum + setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY) + setup_metadata_agent_stubs( + "#{DOCKER_CONSTANTS[:resource_type]}_application") + setup_logging_stubs do + # Tag format here: "application-container.<container_name>. + # <additional_tag>" + d = create_driver(ENABLE_METADATA_AGENT_CONFIG, + DOCKER_CONTAINER_TAG_WITH_APPLICATION) + d.emit('message' => log_entry(0)) + d.run + end + verify_log_entries(1, DOCKER_CONTAINER_WITH_APPLICATION_PARAMS) + end + private def stub_metadata_request(metadata_path, response_body) stub_request(:get, 'http://169.254.169.254/computeMetadata/v1/' + metadata_path) @@ -1037,10 +1181,30 @@ status: 200, headers: { 'Content-Length' => FAKE_AUTH_TOKEN.length, 'Content-Type' => 'application/json' }) end + # The minimum stubs needed for infomation that Metadata Agent can not provide. + def setup_gce_metadata_stubs_minimum + # Stub the root, used for platform detection by the plugin and 'googleauth'. + stub_request(:get, 'http://169.254.169.254') + .to_return(status: 200, headers: { 'Metadata-Flavor' => 'Google' }) + + # Create stubs for all the GCE metadata lookups the agent needs to make. + stub_metadata_request('project/project-id', PROJECT_ID) + stub_metadata_request('instance/attributes/', + "attribute1\nattribute2\nattribute3") + + # Used by 'googleauth' to fetch the default service account credentials. + stub_request(:get, 'http://169.254.169.254/computeMetadata/v1/' \ + 'instance/service-accounts/default/token') + .to_return(body: %({"access_token": "#{FAKE_AUTH_TOKEN}"}), + status: 200, + headers: { 'Content-Length' => FAKE_AUTH_TOKEN.length, + 'Content-Type' => 'application/json' }) + end + def setup_ec2_metadata_stubs # Stub the root, used for platform detection. stub_request(:get, 'http://169.254.169.254') .to_return(status: 200, headers: { 'Server' => 'EC2ws' }) @@ -1087,10 +1251,15 @@ 'INSTANCE_PREFIX: '\ "gke-#{CONTAINER_CLUSTER_NAME}-740fdafa\n"\ 'KUBE_BEARER_TOKEN: AoQiMuwkNP2BMT0S') end + def setup_docker_remote_api_stubs + stub_request(:get, "http://unix/containers/#{DOCKER_CONTAINER_NAME}/json") + .to_return(status: 200, body: "{\"Id\":\"#{DOCKER_CONTAINER_ID}\"}") + end + def setup_cloudfunctions_metadata_stubs stub_metadata_request( 'instance/attributes/', "attribute1\nkube-env\ngcf_region\nlast_attribute") stub_metadata_request('instance/attributes/kube-env', @@ -1100,15 +1269,36 @@ 'KUBE_BEARER_TOKEN: AoQiMuwkNP2BMT0S') stub_metadata_request('instance/attributes/gcf_region', CLOUDFUNCTIONS_REGION) end + def setup_dataproc_metadata_stubs + stub_metadata_request( + 'instance/attributes/', + "attribute1\ndataproc-cluster-uuid\ndataproc-cluster-name") + stub_metadata_request('instance/attributes/dataproc-cluster-name', + DATAPROC_CLUSTER_NAME) + stub_metadata_request('instance/attributes/dataproc-cluster-uuid', + DATAPROC_CLUSTER_UUID) + stub_metadata_request('instance/attributes/dataproc-region', + DATAPROC_REGION) + end + + def setup_metadata_agent_stubs(resource_type) + unique_id = MONITORED_RESOURCE_STUBS[resource_type]['unique_id'] + resource = MONITORED_RESOURCE_STUBS[resource_type]['monitored_resource'] + stub_request(:get, 'http://local-metadata-agent.stackdriver.com:8000' \ + "/monitoredResource/#{unique_id}") + .to_return(status: 200, body: resource) + end + def container_tag_with_container_name(container_name) "kubernetes.#{CONTAINER_POD_NAME}_#{CONTAINER_NAMESPACE_NAME}_" \ "#{container_name}" end + # GKE Container def container_log_entry_with_metadata( log, container_name = CONTAINER_CONTAINER_NAME) { log: log, stream: CONTAINER_STREAM, @@ -1132,10 +1322,31 @@ stream: stream, time: CONTAINER_TIMESTAMP } end + # Docker Container + def docker_container_log_entry_with_metadata( + log, container_id = DOCKER_CONTAINER_ID, + container_name = DOCKER_CONTAINER_NAME, + stream = DOCKER_CONTAINER_STREAM_STDOUT) + { + log: log, + container_id: container_id, + container_name: container_name, + time: DOCKER_CONTAINER_TIMESTAMP, + source: stream + } + end + + def docker_container_log_entry(log) + { + log: log, + time: CONTAINER_TIMESTAMP + } + end + def cloudfunctions_log_entry(i) { stream: 'stdout', log: '[D][2015-09-25T12:34:56.789Z][123-0] ' + log_entry(i) } @@ -1153,10 +1364,19 @@ step: DATAFLOW_STEP_ID, message: log_entry(i) } end + def dataproc_log_entry(message, source_class = 'com.example.Example', + filename = 'test.log') + { + filename: filename, + class: source_class, + message: log_entry(message) + } + end + def ml_log_entry(i) { name: ML_LOG_AREA, message: log_entry(i) } @@ -1211,9 +1431,28 @@ i += 1 assert i <= n, "Number of entries #{i} exceeds expected number #{n}" end end assert i == n, "Number of entries #{i} does not match expected number #{n}" + end + + def verify_container_logs(log_entry_method, expected_params) + setup_gce_metadata_stubs + setup_container_metadata_stubs + [1, 2, 3, 5, 11, 50].each do |n| + @logs_sent = [] + setup_logging_stubs do + d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG) + n.times { |i| d.emit(log_entry_method.call(log_entry(i))) } + d.run + end + verify_log_entries(n, expected_params) do |entry| + assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], + entry + assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry + assert_equal CONTAINER_SEVERITY, entry['severity'], entry + end + end end # Replace the 'referer' field with nil. def http_request_message_with_nil_referer HTTP_REQUEST_MESSAGE.merge('referer' => nil)