test/plugin/base_test.rb in fluent-plugin-google-cloud-0.6.3 vs test/plugin/base_test.rb in fluent-plugin-google-cloud-0.6.4.pre.1
- old
+ new
@@ -97,34 +97,36 @@
d = create_driver
d.run
assert_equal PROJECT_ID, d.instance.project_id
assert_equal ZONE, d.instance.zone
assert_equal VM_ID, d.instance.vm_id
- assert_equal false, d.instance.running_on_managed_vm
+ assert_equal COMPUTE_CONSTANTS[:resource_type], d.instance.resource.type
end
def test_managed_vm_metadata_loading
setup_gce_metadata_stubs
setup_managed_vm_metadata_stubs
d = create_driver
d.run
assert_equal PROJECT_ID, d.instance.project_id
assert_equal ZONE, d.instance.zone
assert_equal VM_ID, d.instance.vm_id
- assert_equal true, d.instance.running_on_managed_vm
- assert_equal MANAGED_VM_BACKEND_NAME, d.instance.gae_backend_name
- assert_equal MANAGED_VM_BACKEND_VERSION, d.instance.gae_backend_version
+ assert_equal APPENGINE_CONSTANTS[:resource_type], d.instance.resource.type
+ assert_equal MANAGED_VM_BACKEND_NAME,
+ d.instance.resource.labels['module_id']
+ assert_equal MANAGED_VM_BACKEND_VERSION,
+ d.instance.resource.labels['version_id']
end
def test_gce_metadata_does_not_load_when_use_metadata_service_is_false
Fluent::GoogleCloudOutput.any_instance.expects(:fetch_metadata).never
d = create_driver(NO_METADATA_SERVICE_CONFIG + CUSTOM_METADATA_CONFIG)
d.run
assert_equal CUSTOM_PROJECT_ID, d.instance.project_id
assert_equal CUSTOM_ZONE, d.instance.zone
assert_equal CUSTOM_VM_ID, d.instance.vm_id
- assert_equal false, d.instance.running_on_managed_vm
+ assert_equal COMPUTE_CONSTANTS[:resource_type], d.instance.resource.type
end
def test_gce_used_when_detect_subservice_is_false
setup_gce_metadata_stubs
# This would cause the resource type to be container.googleapis.com if not
@@ -154,12 +156,10 @@
d = create_driver(config)
d.run
assert_equal parts[1], d.instance.project_id, "Index #{index} failed."
assert_equal parts[2], d.instance.zone, "Index #{index} failed."
assert_equal parts[3], d.instance.vm_id, "Index #{index} failed."
- assert_equal false, d.instance.running_on_managed_vm,
- "Index #{index} failed."
end
end
def test_ec2_metadata_requires_project_id
setup_ec2_metadata_stubs
@@ -699,84 +699,23 @@
end
verify_log_entries(n, VMENGINE_PARAMS)
end
end
- def test_one_container_log_metadata_from_plugin
- setup_gce_metadata_stubs
- setup_container_metadata_stubs
- setup_logging_stubs do
- d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
- d.emit(container_log_entry_with_metadata(log_entry(0)))
- d.run
- end
- verify_log_entries(1, CONTAINER_FROM_METADATA_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['severity'], entry
- end
+ # Test container logs when metadata is extracted from the 'kubernetes' field
+ # in the log record.
+ def test_container_logs_metadata_from_record
+ verify_container_logs(method(:container_log_entry_with_metadata),
+ CONTAINER_FROM_METADATA_PARAMS)
end
- def test_multiple_container_logs_metadata_from_plugin
- setup_gce_metadata_stubs
- setup_container_metadata_stubs
- [2, 3, 5, 11, 50].each do |n|
- @logs_sent = []
- setup_logging_stubs do
- d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
- # The test driver doesn't clear its buffer of entries after running, so
- # do it manually here.
- d.instance_variable_get('@entries').clear
- n.times { |i| d.emit(container_log_entry_with_metadata(log_entry(i))) }
- d.run
- end
- verify_log_entries(n, CONTAINER_FROM_METADATA_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'],
- entry
- assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['severity'], entry
- end
- end
+ # Test container logs when metadata is extracted from the tag.
+ def test_container_logs_metadata_from_tag
+ verify_container_logs(method(:container_log_entry),
+ CONTAINER_FROM_TAG_PARAMS)
end
- def test_multiple_container_logs_metadata_from_tag
- setup_gce_metadata_stubs
- setup_container_metadata_stubs
- [2, 3, 5, 11, 50].each do |n|
- @logs_sent = []
- setup_logging_stubs do
- d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
- # The test driver doesn't clear its buffer of entries after running, so
- # do it manually here.
- d.instance_variable_get('@entries').clear
- n.times { |i| d.emit(container_log_entry(log_entry(i))) }
- d.run
- end
- verify_log_entries(n, CONTAINER_FROM_TAG_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'],
- entry
- assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['severity'], entry
- end
- end
- end
-
- def test_one_container_log_metadata_from_tag
- setup_gce_metadata_stubs
- setup_container_metadata_stubs
- setup_logging_stubs do
- d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
- d.emit(container_log_entry(log_entry(0)))
- d.run
- end
- verify_log_entries(1, CONTAINER_FROM_TAG_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['severity'], entry
- end
- end
-
def test_one_container_log_from_tag_stderr
setup_gce_metadata_stubs
setup_container_metadata_stubs
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
@@ -837,10 +776,138 @@
assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
assert_equal 'WARNING', entry['severity'], entry
end
end
+ # Docker Container.
+
+ # Test textPayload logs from Docker container stderr / stdout.
+ def test_text_payload_docker_container_logs
+ setup_gce_metadata_stubs
+ [1, 2, 3, 5, 11, 50].each do |n|
+ @logs_sent = []
+ setup_logging_stubs do
+ d = create_driver(APPLICATION_DEFAULT_CONFIG, DOCKER_CONTAINER_TAG)
+ # Generate log entries with 'log' (simply a string) and 'time' fields.
+ n.times { |i| d.emit(docker_container_log_entry(log_entry(i))) }
+ d.run
+ end
+ # 'container_id' and 'container_name' should have been extracted from tag
+ # and properly set in resource.labels and common metadata labels as in
+ # DOCKER_CONTAINER_PARAMS.
+ verify_log_entries(n, DOCKER_CONTAINER_PARAMS) do |entry|
+ # Timestamp in 'time' field from log entry should be set properly.
+ assert_equal DOCKER_CONTAINER_SECONDS_EPOCH,
+ entry['timestamp']['seconds'], entry
+ assert_equal DOCKER_CONTAINER_NANOS, entry['timestamp']['nanos'],
+ entry
+ # Severity is 'DEFAULT' because 'stream' info is absent from log entry.
+ assert_equal_with_default entry['severity'], 'DEFAULT', 'DEFAULT', entry
+ end
+ end
+ end
+
+ # Test jsonPayload logs from Docker container stderr / stdout.
+ def test_json_payload_docker_container_logs
+ setup_gce_metadata_stubs
+ [1, 2, 3, 5, 11, 50].each do |n|
+ @logs_sent = []
+ setup_logging_stubs do
+ d = create_driver(APPLICATION_DEFAULT_CONFIG, DOCKER_CONTAINER_TAG)
+ n.times do
+ # Generate log entries with 'log' (json) and 'time' fields.
+ d.emit(docker_container_log_entry('{"msg": "test log entry ' \
+ "#{n}" \
+ '", "tag2": "test", "data": ' \
+ '5000, "severity": "WARNING"}'))
+ end
+ d.run
+ end
+ # 'container_id' and 'container_name' should have been extracted from tag
+ # and properly set in resource.labels and common metadata labels as in
+ # DOCKER_CONTAINER_PARAMS.
+ verify_log_entries(n, DOCKER_CONTAINER_PARAMS, 'jsonPayload') do |entry|
+ # 'log' field should be detected as json and parsed properly.
+ fields = get_fields(entry['jsonPayload'])
+ assert_equal 3, fields.size, entry
+ assert_equal "test log entry #{n}", get_string(fields['msg']), entry
+ assert_equal 'test', get_string(fields['tag2']), entry
+ assert_equal 5000, get_number(fields['data']), entry
+ # Timestamp in 'time' field from log entry should be set properly.
+ assert_equal DOCKER_CONTAINER_SECONDS_EPOCH,
+ entry['timestamp']['seconds'], entry
+ assert_equal DOCKER_CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ # Severity in 'severity' field from log entry should be set properly.
+ assert_equal 'WARNING', entry['severity'], entry
+ end
+ end
+ end
+
+ # Test the following metadata is properly extracted from json record:
+ # * container_id
+ # * container_name
+ # * source (this field maps to the 'stream' metadata label.
+ def test_docker_container_logs_metadata_from_json_record
+ setup_gce_metadata_stubs
+ {
+ # 'stream' label should be extracted from json record when present.
+ docker_container_log_entry_with_metadata(
+ log_entry(0)
+ ) => {
+ # When the 'source' field from the json record has value 'stdout',
+ # 'severity' should be 'INFO'.
+ 'params' => DOCKER_CONTAINER_PARAMS_WITH_STREAM_STDOUT,
+ 'severity' => 'INFO'
+ },
+ # 'container_id' and 'container_name' can be extracted from tag or
+ # json record. If present in both cases, values in the json record will
+ # overwrite values in tags. For example:
+ # DOCKER_CONTAINER_PARAMS_WITH_METADATA_OVERWRITTEN has different
+ # 'container_id' and 'container_name' values from DOCKER_CONTAINER_TAG,
+ # and we need to verify these two fields get overwritten as expected.
+ docker_container_log_entry_with_metadata(
+ log_entry(0), DOCKER_CONTAINER_ID2, DOCKER_CONTAINER_NAME2,
+ DOCKER_CONTAINER_STREAM_STDERR
+ ) => {
+ # When the 'source' field from the json record has value 'stderr',
+ # 'severity' should be 'ERROR'.
+ 'params' => DOCKER_CONTAINER_PARAMS_WITH_METADATA_OVERWRITTEN,
+ 'severity' => 'ERROR'
+ }
+ }.each do |log_entry, expected|
+ @logs_sent = []
+ setup_logging_stubs do
+ d = create_driver(APPLICATION_DEFAULT_CONFIG, DOCKER_CONTAINER_TAG)
+ d.emit(log_entry)
+ d.run
+ end
+ verify_log_entries(1, expected['params']) do |entry|
+ assert_equal DOCKER_CONTAINER_SECONDS_EPOCH,
+ entry['timestamp']['seconds'], entry
+ assert_equal DOCKER_CONTAINER_NANOS, entry['timestamp']['nanos'],
+ entry
+ assert_equal_with_default entry['severity'], expected['severity'],
+ 'DEFAULT', entry
+ end
+ end
+ end
+
+ # Test logs from applications running in Docker containers.
+ def test_docker_container_application_logs
+ setup_gce_metadata_stubs
+ setup_docker_remote_api_stubs
+ setup_logging_stubs do
+ # Metadata Agent is not enabled. Will call Docker Remote API for
+ # container info.
+ d = create_driver(APPLICATION_DEFAULT_CONFIG,
+ DOCKER_CONTAINER_TAG_WITH_APPLICATION)
+ d.emit('message' => log_entry(0))
+ d.run
+ end
+ verify_log_entries(1, DOCKER_CONTAINER_WITH_APPLICATION_PARAMS)
+ end
+
def test_cloudfunctions_log
setup_gce_metadata_stubs
setup_cloudfunctions_metadata_stubs
[1, 2, 3, 5, 11, 50].each do |n|
setup_logging_stubs do
@@ -901,10 +968,21 @@
i += 1
end
end
end
+ def test_dataproc_log
+ setup_gce_metadata_stubs
+ setup_dataproc_metadata_stubs
+ setup_logging_stubs do
+ d = create_driver
+ d.emit(dataproc_log_entry('test message'))
+ d.run
+ end
+ verify_log_entries(1, DATAPROC_PARAMS, 'jsonPayload')
+ end
+
def test_http_request_from_record
setup_gce_metadata_stubs
setup_logging_stubs do
d = create_driver
d.emit('httpRequest' => HTTP_REQUEST_MESSAGE)
@@ -1001,10 +1079,76 @@
assert_nil get_fields(entry['jsonPayload'])['httpRequest'], entry
end
end
end
+ # Metadata Agent related tests.
+ def test_configure_enable_metadata_agent_default_and_false
+ setup_gce_metadata_stubs
+ [create_driver, create_driver(DISABLE_METADATA_AGENT_CONFIG)].each do |d|
+ assert_false d.instance.instance_variable_get(:@enable_metadata_agent)
+ end
+ end
+
+ def test_configure_enable_metadata_agent_true
+ setup_gce_metadata_stubs
+ setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY)
+ d = create_driver(ENABLE_METADATA_AGENT_CONFIG)
+ assert_true d.instance.instance_variable_get(:@enable_metadata_agent)
+ end
+
+ # Test an implicit monitored resource can be retrieved from Metadata Agent
+ # with an empty string as the locally-unique id.
+ def test_retrieve_implicit_monitored_resource
+ # Minimum set up for gce metadata stubs. Stubs for 'vm_id' and 'location'
+ # info are deliberately not set up.
+ setup_gce_metadata_stubs_minimum
+ setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY)
+ setup_logging_stubs do
+ d = create_driver(ENABLE_METADATA_AGENT_CONFIG)
+ d.emit('message' => log_entry(0))
+ d.run
+ end
+ verify_log_entries(1, COMPUTE_PARAMS)
+ end
+
+ # Test a docker container monitored resource can be retrieved from Metadata
+ # Agent with "container.<container_id>" as the locally-unique id.
+ def test_retrieve_docker_container_monitored_resource
+ setup_gce_metadata_stubs_minimum
+ setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY)
+ setup_metadata_agent_stubs(DOCKER_CONSTANTS[:resource_type])
+ setup_logging_stubs do
+ # Tag format here: "container.<container_id>.<container_name>"
+ d = create_driver(ENABLE_METADATA_AGENT_CONFIG, DOCKER_CONTAINER_TAG)
+ d.emit('message' => log_entry(0))
+ d.run
+ end
+ # gce_metadata_stubs has ZONE1, while metadata agent stub has ZONE2. Here we
+ # need to verify ZONE2 overwrites ZONE1 as expected.
+ verify_log_entries(1, DOCKER_CONTAINER_PARAMS_WITH_ZONE2)
+ end
+
+ # Test a docker container monitored resource can be retrieved from Metadata
+ # Agent with "container.<container_name>" as the locally-unique id when the
+ # log entry comes from an application running in the docker container.
+ def test_retrieve_docker_container_monitored_resource_with_application
+ setup_gce_metadata_stubs_minimum
+ setup_metadata_agent_stubs(IMPLICIT_MONITORED_RESOURCE_UNIQUE_KEY)
+ setup_metadata_agent_stubs(
+ "#{DOCKER_CONSTANTS[:resource_type]}_application")
+ setup_logging_stubs do
+ # Tag format here: "application-container.<container_name>.
+ # <additional_tag>"
+ d = create_driver(ENABLE_METADATA_AGENT_CONFIG,
+ DOCKER_CONTAINER_TAG_WITH_APPLICATION)
+ d.emit('message' => log_entry(0))
+ d.run
+ end
+ verify_log_entries(1, DOCKER_CONTAINER_WITH_APPLICATION_PARAMS)
+ end
+
private
def stub_metadata_request(metadata_path, response_body)
stub_request(:get, 'http://169.254.169.254/computeMetadata/v1/' +
metadata_path)
@@ -1037,10 +1181,30 @@
status: 200,
headers: { 'Content-Length' => FAKE_AUTH_TOKEN.length,
'Content-Type' => 'application/json' })
end
+ # The minimum stubs needed for infomation that Metadata Agent can not provide.
+ def setup_gce_metadata_stubs_minimum
+ # Stub the root, used for platform detection by the plugin and 'googleauth'.
+ stub_request(:get, 'http://169.254.169.254')
+ .to_return(status: 200, headers: { 'Metadata-Flavor' => 'Google' })
+
+ # Create stubs for all the GCE metadata lookups the agent needs to make.
+ stub_metadata_request('project/project-id', PROJECT_ID)
+ stub_metadata_request('instance/attributes/',
+ "attribute1\nattribute2\nattribute3")
+
+ # Used by 'googleauth' to fetch the default service account credentials.
+ stub_request(:get, 'http://169.254.169.254/computeMetadata/v1/' \
+ 'instance/service-accounts/default/token')
+ .to_return(body: %({"access_token": "#{FAKE_AUTH_TOKEN}"}),
+ status: 200,
+ headers: { 'Content-Length' => FAKE_AUTH_TOKEN.length,
+ 'Content-Type' => 'application/json' })
+ end
+
def setup_ec2_metadata_stubs
# Stub the root, used for platform detection.
stub_request(:get, 'http://169.254.169.254')
.to_return(status: 200, headers: { 'Server' => 'EC2ws' })
@@ -1087,10 +1251,15 @@
'INSTANCE_PREFIX: '\
"gke-#{CONTAINER_CLUSTER_NAME}-740fdafa\n"\
'KUBE_BEARER_TOKEN: AoQiMuwkNP2BMT0S')
end
+ def setup_docker_remote_api_stubs
+ stub_request(:get, "http://unix/containers/#{DOCKER_CONTAINER_NAME}/json")
+ .to_return(status: 200, body: "{\"Id\":\"#{DOCKER_CONTAINER_ID}\"}")
+ end
+
def setup_cloudfunctions_metadata_stubs
stub_metadata_request(
'instance/attributes/',
"attribute1\nkube-env\ngcf_region\nlast_attribute")
stub_metadata_request('instance/attributes/kube-env',
@@ -1100,15 +1269,36 @@
'KUBE_BEARER_TOKEN: AoQiMuwkNP2BMT0S')
stub_metadata_request('instance/attributes/gcf_region',
CLOUDFUNCTIONS_REGION)
end
+ def setup_dataproc_metadata_stubs
+ stub_metadata_request(
+ 'instance/attributes/',
+ "attribute1\ndataproc-cluster-uuid\ndataproc-cluster-name")
+ stub_metadata_request('instance/attributes/dataproc-cluster-name',
+ DATAPROC_CLUSTER_NAME)
+ stub_metadata_request('instance/attributes/dataproc-cluster-uuid',
+ DATAPROC_CLUSTER_UUID)
+ stub_metadata_request('instance/attributes/dataproc-region',
+ DATAPROC_REGION)
+ end
+
+ def setup_metadata_agent_stubs(resource_type)
+ unique_id = MONITORED_RESOURCE_STUBS[resource_type]['unique_id']
+ resource = MONITORED_RESOURCE_STUBS[resource_type]['monitored_resource']
+ stub_request(:get, 'http://local-metadata-agent.stackdriver.com:8000' \
+ "/monitoredResource/#{unique_id}")
+ .to_return(status: 200, body: resource)
+ end
+
def container_tag_with_container_name(container_name)
"kubernetes.#{CONTAINER_POD_NAME}_#{CONTAINER_NAMESPACE_NAME}_" \
"#{container_name}"
end
+ # GKE Container
def container_log_entry_with_metadata(
log, container_name = CONTAINER_CONTAINER_NAME)
{
log: log,
stream: CONTAINER_STREAM,
@@ -1132,10 +1322,31 @@
stream: stream,
time: CONTAINER_TIMESTAMP
}
end
+ # Docker Container
+ def docker_container_log_entry_with_metadata(
+ log, container_id = DOCKER_CONTAINER_ID,
+ container_name = DOCKER_CONTAINER_NAME,
+ stream = DOCKER_CONTAINER_STREAM_STDOUT)
+ {
+ log: log,
+ container_id: container_id,
+ container_name: container_name,
+ time: DOCKER_CONTAINER_TIMESTAMP,
+ source: stream
+ }
+ end
+
+ def docker_container_log_entry(log)
+ {
+ log: log,
+ time: CONTAINER_TIMESTAMP
+ }
+ end
+
def cloudfunctions_log_entry(i)
{
stream: 'stdout',
log: '[D][2015-09-25T12:34:56.789Z][123-0] ' + log_entry(i)
}
@@ -1153,10 +1364,19 @@
step: DATAFLOW_STEP_ID,
message: log_entry(i)
}
end
+ def dataproc_log_entry(message, source_class = 'com.example.Example',
+ filename = 'test.log')
+ {
+ filename: filename,
+ class: source_class,
+ message: log_entry(message)
+ }
+ end
+
def ml_log_entry(i)
{
name: ML_LOG_AREA,
message: log_entry(i)
}
@@ -1211,9 +1431,28 @@
i += 1
assert i <= n, "Number of entries #{i} exceeds expected number #{n}"
end
end
assert i == n, "Number of entries #{i} does not match expected number #{n}"
+ end
+
+ def verify_container_logs(log_entry_method, expected_params)
+ setup_gce_metadata_stubs
+ setup_container_metadata_stubs
+ [1, 2, 3, 5, 11, 50].each do |n|
+ @logs_sent = []
+ setup_logging_stubs do
+ d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
+ n.times { |i| d.emit(log_entry_method.call(log_entry(i))) }
+ d.run
+ end
+ verify_log_entries(n, expected_params) do |entry|
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'],
+ entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal CONTAINER_SEVERITY, entry['severity'], entry
+ end
+ end
end
# Replace the 'referer' field with nil.
def http_request_message_with_nil_referer
HTTP_REQUEST_MESSAGE.merge('referer' => nil)