test/plugin/base_test.rb in fluent-plugin-google-cloud-0.5.6 vs test/plugin/base_test.rb in fluent-plugin-google-cloud-0.6.0.v2.alpha.1
- old
+ new
@@ -17,10 +17,12 @@
require 'mocha/test_unit'
require 'webmock/test_unit'
# Unit tests for Google Cloud Logging plugin
module BaseTest
+ include Fluent::GoogleCloudOutput::Constants
+
def setup
Fluent::Test.setup
# delete environment variables that googleauth uses to find credentials.
ENV.delete('GOOGLE_APPLICATION_CREDENTIALS')
# service account env.
@@ -38,10 +40,12 @@
end
# generic attributes
HOSTNAME = Socket.gethostname
+ WRITE_LOG_ENTRIES_URI = 'https://logging.googleapis.com/v2beta1/entries:write'
+
# attributes used for the GCE metadata service
PROJECT_ID = 'test-project-id'
ZONE = 'us-central1-b'
FULLY_QUALIFIED_ZONE = 'projects/' + PROJECT_ID + '/zones/' + ZONE
VM_ID = '9876543210'
@@ -94,10 +98,25 @@
CLOUDFUNCTIONS_CLUSTER_NAME = 'cluster-1'
CLOUDFUNCTIONS_NAMESPACE_NAME = 'default'
CLOUDFUNCTIONS_POD_NAME = 'd.dc.myu.uc.functionp.pc.name-a.a1.987-c0l82'
CLOUDFUNCTIONS_CONTAINER_NAME = 'worker'
+ # Dataflow specific labels
+ DATAFLOW_REGION = 'us-central1'
+ DATAFLOW_JOB_NAME = 'job_name_1'
+ DATAFLOW_JOB_ID = 'job_id_1'
+ DATAFLOW_STEP_ID = 'step_1'
+ DATAFLOW_TAG = 'dataflow.googleapis.com/worker'
+
+ # ML specific labels
+ ML_REGION = 'us-central1'
+ ML_JOB_ID = 'job_name_1'
+ ML_TASK_NAME = 'task_name_1'
+ ML_TRIAL_ID = 'trial_id_1'
+ ML_LOG_AREA = 'log_area_1'
+ ML_TAG = 'master-replica-0'
+
# Parameters used for authentication
AUTH_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
FAKE_AUTH_TOKEN = 'abc123'
# Information about test credentials files.
@@ -169,147 +188,223 @@
CONFIG_EC2_PROJECT_ID_AND_CUSTOM_VM_ID = %(
project_id #{EC2_PROJECT_ID}
vm_id #{CUSTOM_VM_ID}
)
- # Service configurations for various services
- COMPUTE_SERVICE_NAME = 'compute.googleapis.com'
- APPENGINE_SERVICE_NAME = 'appengine.googleapis.com'
- CONTAINER_SERVICE_NAME = 'container.googleapis.com'
- CLOUDFUNCTIONS_SERVICE_NAME = 'cloudfunctions.googleapis.com'
- EC2_SERVICE_NAME = 'ec2.amazonaws.com'
+ CONFIG_DATAFLOW = %(
+ subservice_name "#{DATAFLOW_CONSTANTS[:service]}"
+ labels {
+ "#{DATAFLOW_CONSTANTS[:service]}/region" : "#{DATAFLOW_REGION}",
+ "#{DATAFLOW_CONSTANTS[:service]}/job_name" : "#{DATAFLOW_JOB_NAME}",
+ "#{DATAFLOW_CONSTANTS[:service]}/job_id" : "#{DATAFLOW_JOB_ID}"
+ }
+ label_map { "step": "#{DATAFLOW_CONSTANTS[:service]}/step_id" }
+ )
+ CONFIG_ML = %(
+ subservice_name "#{ML_CONSTANTS[:service]}"
+ labels {
+ "#{ML_CONSTANTS[:service]}/job_id" : "#{ML_JOB_ID}",
+ "#{ML_CONSTANTS[:service]}/task_name" : "#{ML_TASK_NAME}",
+ "#{ML_CONSTANTS[:service]}/trial_id" : "#{ML_TRIAL_ID}"
+ }
+ label_map { "name": "#{ML_CONSTANTS[:service]}/job_id/log_area" }
+ )
+
+ # Service configurations for various services
COMPUTE_PARAMS = {
- service_name: COMPUTE_SERVICE_NAME,
+ resource: {
+ type: COMPUTE_CONSTANTS[:resource_type],
+ labels: {
+ 'instance_id' => VM_ID,
+ 'zone' => ZONE
+ }
+ },
log_name: 'test',
project_id: PROJECT_ID,
- zone: ZONE,
labels: {
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME
}
}
VMENGINE_PARAMS = {
- service_name: APPENGINE_SERVICE_NAME,
- log_name: "#{APPENGINE_SERVICE_NAME}%2Ftest",
+ resource: {
+ type: APPENGINE_CONSTANTS[:resource_type],
+ labels: {
+ 'module_id' => MANAGED_VM_BACKEND_NAME,
+ 'version_id' => MANAGED_VM_BACKEND_VERSION
+ }
+ },
+ log_name: "#{APPENGINE_CONSTANTS[:service]}%2Ftest",
project_id: PROJECT_ID,
- zone: ZONE,
labels: {
- "#{APPENGINE_SERVICE_NAME}/module_id" => MANAGED_VM_BACKEND_NAME,
- "#{APPENGINE_SERVICE_NAME}/version_id" => MANAGED_VM_BACKEND_VERSION,
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
+ "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
}
}
CONTAINER_TAG = "kubernetes.#{CONTAINER_POD_NAME}_" \
"#{CONTAINER_NAMESPACE_NAME}_#{CONTAINER_CONTAINER_NAME}"
CONTAINER_FROM_METADATA_PARAMS = {
- service_name: CONTAINER_SERVICE_NAME,
+ resource: {
+ type: CONTAINER_CONSTANTS[:resource_type],
+ labels: {
+ 'cluster_name' => CONTAINER_CLUSTER_NAME,
+ 'namespace_id' => CONTAINER_NAMESPACE_ID,
+ 'instance_id' => VM_ID,
+ 'pod_id' => CONTAINER_POD_ID,
+ 'container_name' => CONTAINER_CONTAINER_NAME,
+ 'zone' => ZONE
+ }
+ },
log_name: CONTAINER_CONTAINER_NAME,
project_id: PROJECT_ID,
- zone: ZONE,
labels: {
- "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID,
- "#{CONTAINER_SERVICE_NAME}/cluster_name" => CONTAINER_CLUSTER_NAME,
- "#{CONTAINER_SERVICE_NAME}/namespace_name" => CONTAINER_NAMESPACE_NAME,
- "#{CONTAINER_SERVICE_NAME}/namespace_id" => CONTAINER_NAMESPACE_ID,
- "#{CONTAINER_SERVICE_NAME}/pod_name" => CONTAINER_POD_NAME,
- "#{CONTAINER_SERVICE_NAME}/pod_id" => CONTAINER_POD_ID,
- "#{CONTAINER_SERVICE_NAME}/container_name" => CONTAINER_CONTAINER_NAME,
- "#{CONTAINER_SERVICE_NAME}/stream" => CONTAINER_STREAM,
+ "#{CONTAINER_CONSTANTS[:service]}/namespace_name" =>
+ CONTAINER_NAMESPACE_NAME,
+ "#{CONTAINER_CONSTANTS[:service]}/pod_name" => CONTAINER_POD_NAME,
+ "#{CONTAINER_CONSTANTS[:service]}/stream" => CONTAINER_STREAM,
"label/#{CONTAINER_LABEL_KEY}" => CONTAINER_LABEL_VALUE,
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME
}
}
# Almost the same as from metadata, but missing namespace_id and pod_id.
CONTAINER_FROM_TAG_PARAMS = {
- service_name: CONTAINER_SERVICE_NAME,
+ resource: {
+ type: CONTAINER_CONSTANTS[:resource_type],
+ labels: {
+ 'cluster_name' => CONTAINER_CLUSTER_NAME,
+ 'instance_id' => VM_ID,
+ 'container_name' => CONTAINER_CONTAINER_NAME,
+ 'zone' => ZONE
+ }
+ },
log_name: CONTAINER_CONTAINER_NAME,
project_id: PROJECT_ID,
- zone: ZONE,
labels: {
- "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID,
- "#{CONTAINER_SERVICE_NAME}/cluster_name" => CONTAINER_CLUSTER_NAME,
- "#{CONTAINER_SERVICE_NAME}/namespace_name" => CONTAINER_NAMESPACE_NAME,
- "#{CONTAINER_SERVICE_NAME}/pod_name" => CONTAINER_POD_NAME,
- "#{CONTAINER_SERVICE_NAME}/container_name" => CONTAINER_CONTAINER_NAME,
- "#{CONTAINER_SERVICE_NAME}/stream" => CONTAINER_STREAM,
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{CONTAINER_CONSTANTS[:service]}/namespace_name" =>
+ CONTAINER_NAMESPACE_NAME,
+ "#{CONTAINER_CONSTANTS[:service]}/pod_name" => CONTAINER_POD_NAME,
+ "#{CONTAINER_CONSTANTS[:service]}/stream" => CONTAINER_STREAM,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME
}
}
CLOUDFUNCTIONS_TAG = "kubernetes.#{CLOUDFUNCTIONS_POD_NAME}_" \
"#{CLOUDFUNCTIONS_NAMESPACE_NAME}_" \
"#{CLOUDFUNCTIONS_CONTAINER_NAME}"
CLOUDFUNCTIONS_PARAMS = {
- service_name: CLOUDFUNCTIONS_SERVICE_NAME,
+ resource: {
+ type: CLOUDFUNCTIONS_CONSTANTS[:resource_type],
+ labels: {
+ 'function_name' => CLOUDFUNCTIONS_FUNCTION_NAME,
+ 'region' => CLOUDFUNCTIONS_REGION
+ }
+ },
log_name: 'cloud-functions',
project_id: PROJECT_ID,
- zone: ZONE,
labels: {
'execution_id' => CLOUDFUNCTIONS_EXECUTION_ID,
- "#{CLOUDFUNCTIONS_SERVICE_NAME}/function_name" =>
- CLOUDFUNCTIONS_FUNCTION_NAME,
- "#{CLOUDFUNCTIONS_SERVICE_NAME}/region" => CLOUDFUNCTIONS_REGION,
- "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID,
- "#{CONTAINER_SERVICE_NAME}/cluster_name" => CLOUDFUNCTIONS_CLUSTER_NAME,
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{CONTAINER_CONSTANTS[:service]}/instance_id" => VM_ID,
+ "#{CONTAINER_CONSTANTS[:service]}/cluster_name" =>
+ CLOUDFUNCTIONS_CLUSTER_NAME,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
+ "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
}
}
CLOUDFUNCTIONS_TEXT_NOT_MATCHED_PARAMS = {
- service_name: CLOUDFUNCTIONS_SERVICE_NAME,
+ resource: {
+ type: CLOUDFUNCTIONS_CONSTANTS[:resource_type],
+ labels: {
+ 'function_name' => CLOUDFUNCTIONS_FUNCTION_NAME,
+ 'region' => CLOUDFUNCTIONS_REGION
+ }
+ },
log_name: 'cloud-functions',
project_id: PROJECT_ID,
- zone: ZONE,
labels: {
- "#{CLOUDFUNCTIONS_SERVICE_NAME}/function_name" =>
- CLOUDFUNCTIONS_FUNCTION_NAME,
- "#{CLOUDFUNCTIONS_SERVICE_NAME}/region" => CLOUDFUNCTIONS_REGION,
- "#{CONTAINER_SERVICE_NAME}/instance_id" => VM_ID,
- "#{CONTAINER_SERVICE_NAME}/cluster_name" => CLOUDFUNCTIONS_CLUSTER_NAME,
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{CONTAINER_CONSTANTS[:service]}/instance_id" => VM_ID,
+ "#{CONTAINER_CONSTANTS[:service]}/cluster_name" =>
+ CLOUDFUNCTIONS_CLUSTER_NAME,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
+ "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
}
}
+ DATAFLOW_PARAMS = {
+ resource: {
+ type: DATAFLOW_CONSTANTS[:resource_type],
+ labels: {
+ 'job_name' => DATAFLOW_JOB_NAME,
+ 'job_id' => DATAFLOW_JOB_ID,
+ 'step_id' => DATAFLOW_STEP_ID,
+ 'region' => DATAFLOW_REGION
+ }
+ },
+ log_name: DATAFLOW_TAG,
+ project_id: PROJECT_ID,
+ labels: {
+ "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
+ "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
+ }
+ }
+
+ ML_PARAMS = {
+ resource: {
+ type: ML_CONSTANTS[:resource_type],
+ labels: {
+ 'job_id' => ML_JOB_ID,
+ 'task_name' => ML_TASK_NAME
+ }
+ },
+ log_name: ML_TAG,
+ project_id: PROJECT_ID,
+ labels: {
+ "#{ML_CONSTANTS[:service]}/trial_id" => ML_TRIAL_ID,
+ "#{ML_CONSTANTS[:service]}/job_id/log_area" => ML_LOG_AREA,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
+ "#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
+ }
+ }
+
CUSTOM_PARAMS = {
- service_name: COMPUTE_SERVICE_NAME,
+ resource: {
+ type: COMPUTE_CONSTANTS[:resource_type],
+ labels: {
+ 'instance_id' => CUSTOM_VM_ID,
+ 'zone' => CUSTOM_ZONE
+ }
+ },
log_name: 'test',
project_id: CUSTOM_PROJECT_ID,
- zone: CUSTOM_ZONE,
labels: {
- "#{COMPUTE_SERVICE_NAME}/resource_type" => 'instance',
- "#{COMPUTE_SERVICE_NAME}/resource_id" => CUSTOM_VM_ID,
- "#{COMPUTE_SERVICE_NAME}/resource_name" => CUSTOM_HOSTNAME
+ "#{COMPUTE_CONSTANTS[:service]}/resource_name" => CUSTOM_HOSTNAME
}
}
EC2_PARAMS = {
- service_name: EC2_SERVICE_NAME,
+ resource: {
+ type: EC2_CONSTANTS[:resource_type],
+ labels: {
+ 'instance_id' => EC2_VM_ID,
+ 'region' => EC2_PREFIXED_ZONE,
+ 'aws_account' => EC2_ACCOUNT_ID
+ }
+ },
log_name: 'test',
project_id: EC2_PROJECT_ID,
- zone: EC2_PREFIXED_ZONE,
labels: {
- "#{EC2_SERVICE_NAME}/resource_type" => 'instance',
- "#{EC2_SERVICE_NAME}/resource_id" => EC2_VM_ID,
- "#{EC2_SERVICE_NAME}/account_id" => EC2_ACCOUNT_ID,
- "#{EC2_SERVICE_NAME}/resource_name" => HOSTNAME
+ "#{EC2_CONSTANTS[:service]}/resource_name" => HOSTNAME
}
}
HTTP_REQUEST_MESSAGE = {
'requestMethod' => 'POST',
@@ -432,16 +527,16 @@
assert_equal false, d.instance.running_on_managed_vm
end
def test_gce_used_when_detect_subservice_is_false
setup_gce_metadata_stubs
- # This would cause the service to be container.googleapis.com if not for the
- # detect_subservice=false config.
+ # This would cause the resource type to be container.googleapis.com if not
+ # for the detect_subservice=false config.
setup_container_metadata_stubs
d = create_driver(NO_DETECT_SUBSERVICE_CONFIG)
d.run
- assert_equal COMPUTE_SERVICE_NAME, d.instance.service_name
+ assert_equal COMPUTE_CONSTANTS[:resource_type], d.instance.resource.type
end
def test_metadata_overrides
{
# In this case we are overriding all configured parameters so we should
@@ -552,29 +647,29 @@
d.run
end
verify_log_entries(1, EC2_PARAMS)
end
- def test_struct_payload_log
+ def test_structured_payload_log
setup_gce_metadata_stubs
setup_logging_stubs do
d = create_driver
d.emit('msg' => log_entry(0), 'tag2' => 'test', 'data' => 5000,
'some_null_field' => nil)
d.run
end
- verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 4, fields.size, entry
assert_equal 'test log entry 0', get_string(fields['msg']), entry
assert_equal 'test', get_string(fields['tag2']), entry
assert_equal 5000, get_number(fields['data']), entry
assert_equal null_value, fields['some_null_field'], entry
end
end
- def test_struct_payload_malformatted_log
+ def test_structured_payload_malformatted_log
setup_gce_metadata_stubs
message = 'test message'
setup_logging_stubs do
d = create_driver
d.emit(
@@ -586,12 +681,12 @@
'symbol_key' => { some_symbol: message },
'nil_key' => { nil => message }
)
d.run
end
- verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 7, fields.size, entry
assert_equal message, get_string(get_fields(get_struct(fields \
['int_key']))['1']), entry
assert_equal message, get_string(get_fields(get_struct(fields \
['int_array_key']))['[1, 2, 3, 4]']), entry
@@ -606,11 +701,11 @@
assert_equal message, get_string(get_fields(get_struct(fields \
['nil_key']))['']), entry
end
end
- def test_struct_payload_json_log
+ def test_structured_payload_json_log
setup_gce_metadata_stubs
setup_logging_stubs do
d = create_driver
json_string = '{"msg": "test log entry 0", "tag2": "test", "data": 5000}'
d.emit('message' => 'notJSON ' + json_string)
@@ -622,11 +717,11 @@
verify_log_entries(4, COMPUTE_PARAMS, '') do |entry|
assert entry.key?('textPayload'), 'Entry did not have textPayload'
end
end
- def test_struct_payload_json_container_log
+ def test_structured_payload_json_container_log
setup_gce_metadata_stubs
setup_container_metadata_stubs
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
json_string = '{"msg": "test log entry 0", "tag2": "test", ' \
@@ -641,12 +736,12 @@
3, CONTAINER_FROM_METADATA_PARAMS, '') do |entry|
log_index += 1
if log_index == 1
assert entry.key?('textPayload'), 'Entry did not have textPayload'
else
- assert entry.key?('structPayload'), 'Entry did not have structPayload'
- fields = get_fields(entry['structPayload'])
+ assert entry.key?('jsonPayload'), 'Entry did not have jsonPayload'
+ fields = get_fields(entry['jsonPayload'])
assert_equal 4, fields.size, entry
assert_equal 'test log entry 0', get_string(fields['msg']), entry
assert_equal 'test', get_string(fields['tag2']), entry
assert_equal 5000, get_number(fields['data']), entry
assert_equal null_value, fields['some_null_field'], entry
@@ -663,11 +758,11 @@
@logs_sent = []
d = create_driver(REQUIRE_VALID_TAGS_CONFIG, tag)
d.emit('msg' => log_entry(0))
d.run
end
- verify_log_entries(0, COMPUTE_PARAMS, 'structPayload')
+ verify_log_entries(0, COMPUTE_PARAMS, 'jsonPayload')
end
end
# Verify that empty string container name should fail the kubernetes regex
# match, thus the original tag is used as the log name.
@@ -676,22 +771,21 @@
setup_container_metadata_stubs
container_name = ''
# This tag will not match the kubernetes regex because it requires a
# non-empty container name.
tag = container_tag_with_container_name(container_name)
- params = CONTAINER_FROM_METADATA_PARAMS.merge(
- labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge(
- "#{CONTAINER_SERVICE_NAME}/container_name" => container_name),
- log_name: tag)
- setup_logging_stubs([params]) do
- @logs_sent = []
+ setup_logging_stubs do
d = create_driver(REQUIRE_VALID_TAGS_CONFIG, tag)
d.emit(container_log_entry_with_metadata(log_entry(0), container_name))
d.run
end
+ params = CONTAINER_FROM_METADATA_PARAMS.merge(
+ resource: CONTAINER_FROM_METADATA_PARAMS[:resource].merge(
+ labels: CONTAINER_FROM_METADATA_PARAMS[:resource][:labels].merge(
+ 'container_name' => container_name)),
+ log_name: tag)
verify_log_entries(1, params, 'textPayload')
- assert_equal "projects/#{PROJECT_ID}/logs/#{tag}", @logs_sent[0]['logName']
end
# Verify that container names with non-utf8 characters should be rejected when
# 'require_valid_tags' is true.
def test_reject_non_utf8_container_name_with_require_valid_tags_true
@@ -699,80 +793,77 @@
setup_container_metadata_stubs
non_utf8_tags = INVALID_TAGS.select do |tag, _|
tag.is_a?(String) && !tag.empty?
end
non_utf8_tags.each do |container_name, encoded_name|
- params = CONTAINER_FROM_METADATA_PARAMS.merge(
- labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge(
- "#{CONTAINER_SERVICE_NAME}/container_name" =>
- URI.decode(encoded_name)),
- log_name: encoded_name)
- setup_logging_stubs([params]) do
+ setup_logging_stubs do
@logs_sent = []
d = create_driver(REQUIRE_VALID_TAGS_CONFIG,
container_tag_with_container_name(container_name))
d.emit(container_log_entry_with_metadata(log_entry(0), container_name))
d.run
end
+ params = CONTAINER_FROM_METADATA_PARAMS.merge(
+ labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge(
+ "#{CONTAINER_CONSTANTS[:service]}/container_name" =>
+ URI.decode(encoded_name)),
+ log_name: encoded_name)
verify_log_entries(0, params, 'textPayload')
end
end
# Verify that tags are properly encoded. When 'require_valid_tags' is true, we
# only accept string tags with utf8 characters.
def test_encode_tags_with_require_valid_tags_true
setup_gce_metadata_stubs
VALID_TAGS.each do |tag, encoded_tag|
- setup_logging_stubs([COMPUTE_PARAMS.merge(log_name: encoded_tag)]) do
+ setup_logging_stubs do
@logs_sent = []
d = create_driver(REQUIRE_VALID_TAGS_CONFIG, tag)
d.emit('msg' => log_entry(0))
d.run
end
- verify_log_entries(1, COMPUTE_PARAMS, 'structPayload')
- assert_equal "projects/#{PROJECT_ID}/logs/#{encoded_tag}",
- @logs_sent[0]['logName']
+ verify_log_entries(1, COMPUTE_PARAMS.merge(log_name: encoded_tag),
+ 'jsonPayload')
end
end
# Verify that tags extracted from container names are properly encoded.
def test_encode_tags_from_container_name_with_require_valid_tags_true
setup_gce_metadata_stubs
setup_container_metadata_stubs
VALID_TAGS.each do |tag, encoded_tag|
- params = CONTAINER_FROM_METADATA_PARAMS.merge(
- labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge(
- "#{CONTAINER_SERVICE_NAME}/container_name" => tag),
- log_name: encoded_tag)
- setup_logging_stubs([params]) do
+ setup_logging_stubs do
@logs_sent = []
d = create_driver(REQUIRE_VALID_TAGS_CONFIG,
container_tag_with_container_name(tag))
d.emit(container_log_entry_with_metadata(log_entry(0), tag))
d.run
end
+ params = CONTAINER_FROM_METADATA_PARAMS.merge(
+ resource: CONTAINER_FROM_METADATA_PARAMS[:resource].merge(
+ labels: CONTAINER_FROM_METADATA_PARAMS[:resource][:labels].merge(
+ 'container_name' => tag)),
+ log_name: encoded_tag)
verify_log_entries(1, params, 'textPayload')
- assert_equal "projects/#{PROJECT_ID}/logs/#{encoded_tag}",
- @logs_sent[0]['logName']
end
end
# Verify that tags are properly encoded and sanitized. When
# 'require_valid_tags' is false, we try to convert any non-string tags to
# strings, and replace non-utf8 characters with a replacement string.
def test_sanitize_tags_with_require_valid_tags_false
setup_gce_metadata_stubs
ALL_TAGS.each do |tag, sanitized_tag|
- setup_logging_stubs([COMPUTE_PARAMS.merge(log_name: sanitized_tag)]) do
+ setup_logging_stubs do
@logs_sent = []
d = create_driver(APPLICATION_DEFAULT_CONFIG, tag)
d.emit('msg' => log_entry(0))
d.run
end
- verify_log_entries(1, COMPUTE_PARAMS, 'structPayload')
- assert_equal "projects/#{PROJECT_ID}/logs/#{sanitized_tag}",
- @logs_sent[0]['logName']
+ verify_log_entries(1, COMPUTE_PARAMS.merge(log_name: sanitized_tag),
+ 'jsonPayload')
end
end
# Verify that tags extracted from container names are properly encoded and
# sanitized.
@@ -785,25 +876,23 @@
# non-empty string cases here.
string_tags = ALL_TAGS.select { |tag, _| tag.is_a?(String) && !tag.empty? }
string_tags.each do |container_name, encoded_container_name|
# Container name in the label is sanitized but not encoded, while the log
# name is encoded.
- params = CONTAINER_FROM_METADATA_PARAMS.merge(
- labels: CONTAINER_FROM_METADATA_PARAMS[:labels].merge(
- "#{CONTAINER_SERVICE_NAME}/container_name" =>
- URI.decode(encoded_container_name)),
- log_name: encoded_container_name)
- setup_logging_stubs([params]) do
+ setup_logging_stubs do
@logs_sent = []
d = create_driver(APPLICATION_DEFAULT_CONFIG,
container_tag_with_container_name(container_name))
d.emit(container_log_entry_with_metadata(log_entry(0), container_name))
d.run
end
+ params = CONTAINER_FROM_METADATA_PARAMS.merge(
+ resource: CONTAINER_FROM_METADATA_PARAMS[:resource].merge(
+ labels: CONTAINER_FROM_METADATA_PARAMS[:resource][:labels].merge(
+ 'container_name' => URI.decode(encoded_container_name))),
+ log_name: encoded_container_name)
verify_log_entries(1, params, 'textPayload')
- assert_equal "projects/#{PROJECT_ID}/logs/#{encoded_container_name}",
- @logs_sent[0]['logName']
end
end
def test_timestamps
setup_gce_metadata_stubs
@@ -831,36 +920,36 @@
d.run
end
end
verify_index = 0
verify_log_entries(emit_index, COMPUTE_PARAMS) do |entry|
- assert_equal_with_default entry['metadata']['timestamp']['seconds'],
+ assert_equal_with_default entry['timestamp']['seconds'],
expected_ts[verify_index].tv_sec, 0, entry
- assert_equal_with_default entry['metadata']['timestamp']['nanos'],
+ assert_equal_with_default entry['timestamp']['nanos'],
expected_ts[verify_index].tv_nsec, 0, entry do
# Fluentd v0.14 onwards supports nanosecond timestamp values.
# Added in 600 ns delta to avoid flaky tests introduced
# due to rounding error in double-precision floating-point numbers
# (to account for the missing 9 bits of precision ~ 512 ns).
# See http://wikipedia.org/wiki/Double-precision_floating-point_format
assert_in_delta expected_ts[verify_index].tv_nsec,
- entry['metadata']['timestamp']['nanos'], 600, entry
+ entry['timestamp']['nanos'], 600, entry
end
verify_index += 1
end
end
def test_malformed_timestamp
setup_gce_metadata_stubs
setup_logging_stubs do
d = create_driver
- # if timestamp is not a hash it is passed through to the struct payload.
+ # if timestamp is not a hash it is passed through to the json payload.
d.emit('message' => log_entry(0), 'timestamp' => 'not-a-hash')
d.run
end
- verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 2, fields.size, entry
assert_equal 'not-a-hash', get_string(fields['timestamp']), entry
end
end
@@ -935,11 +1024,11 @@
"label_number_two": "foo.googleapis.com/bar",
"label3": "label3"
}
)
d = create_driver(config)
- # not_a_label passes through to the struct payload
+ # not_a_label passes through to the json payload
d.emit('message' => log_entry(0),
'label1' => 'value1',
'label_number_two' => 'value2',
'not_a_label' => 'value4',
'label3' => 'value3')
@@ -948,12 +1037,12 @@
# make a deep copy of COMPUTE_PARAMS and add the parsed labels.
params = Marshal.load(Marshal.dump(COMPUTE_PARAMS))
params[:labels]['sent_label_1'] = 'value1'
params[:labels]['foo.googleapis.com/bar'] = 'value2'
params[:labels]['label3'] = 'value3'
- verify_log_entries(1, params, 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ verify_log_entries(1, params, 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 2, fields.size, entry
assert_equal 'test log entry 0', get_string(fields['message']), entry
assert_equal 'value4', get_string(fields['not_a_label']), entry
end
end
@@ -1021,15 +1110,13 @@
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
d.emit(container_log_entry_with_metadata(log_entry(0)))
d.run
end
verify_log_entries(1, CONTAINER_FROM_METADATA_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal CONTAINER_SEVERITY, entry['severity'], entry
end
end
def test_multiple_container_logs_metadata_from_plugin
setup_gce_metadata_stubs
@@ -1043,15 +1130,14 @@
d.instance_variable_get('@entries').clear
n.times { |i| d.emit(container_log_entry_with_metadata(log_entry(i))) }
d.run
end
verify_log_entries(n, CONTAINER_FROM_METADATA_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'],
+ entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal CONTAINER_SEVERITY, entry['severity'], entry
end
end
end
def test_multiple_container_logs_metadata_from_tag
@@ -1066,15 +1152,14 @@
d.instance_variable_get('@entries').clear
n.times { |i| d.emit(container_log_entry(log_entry(i))) }
d.run
end
verify_log_entries(n, CONTAINER_FROM_TAG_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'],
+ entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal CONTAINER_SEVERITY, entry['severity'], entry
end
end
end
def test_one_container_log_metadata_from_tag
@@ -1084,15 +1169,13 @@
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
d.emit(container_log_entry(log_entry(0)))
d.run
end
verify_log_entries(1, CONTAINER_FROM_TAG_PARAMS) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal CONTAINER_SEVERITY, entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal CONTAINER_SEVERITY, entry['severity'], entry
end
end
def test_one_container_log_from_tag_stderr
setup_gce_metadata_stubs
@@ -1101,68 +1184,62 @@
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
d.emit(container_log_entry(log_entry(0), 'stderr'))
d.run
end
expected_params = CONTAINER_FROM_TAG_PARAMS.merge(
- labels: { "#{CONTAINER_SERVICE_NAME}/stream" => 'stderr' }
+ labels: { "#{CONTAINER_CONSTANTS[:service]}/stream" => 'stderr' }
) { |_, oldval, newval| oldval.merge(newval) }
verify_log_entries(1, expected_params) do |entry|
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal 'ERROR', entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal 'ERROR', entry['severity'], entry
end
end
- def test_struct_container_log_metadata_from_plugin
+ def test_json_container_log_metadata_from_plugin
setup_gce_metadata_stubs
setup_container_metadata_stubs
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
d.emit(container_log_entry_with_metadata('{"msg": "test log entry 0", ' \
'"tag2": "test", "data": ' \
'5000, "severity": "WARNING"}'))
d.run
end
verify_log_entries(1, CONTAINER_FROM_METADATA_PARAMS,
- 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 3, fields.size, entry
assert_equal 'test log entry 0', get_string(fields['msg']), entry
assert_equal 'test', get_string(fields['tag2']), entry
assert_equal 5000, get_number(fields['data']), entry
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal 'WARNING', entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal 'WARNING', entry['severity'], entry
end
end
- def test_struct_container_log_metadata_from_tag
+ def test_json_container_log_metadata_from_tag
setup_gce_metadata_stubs
setup_container_metadata_stubs
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
d.emit(container_log_entry('{"msg": "test log entry 0", ' \
'"tag2": "test", "data": 5000, ' \
'"severity": "W"}'))
d.run
end
verify_log_entries(1, CONTAINER_FROM_TAG_PARAMS,
- 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 3, fields.size, entry
assert_equal 'test log entry 0', get_string(fields['msg']), entry
assert_equal 'test', get_string(fields['tag2']), entry
assert_equal 5000, get_number(fields['data']), entry
- assert_equal CONTAINER_SECONDS_EPOCH, \
- entry['metadata']['timestamp']['seconds'], entry
- assert_equal CONTAINER_NANOS, \
- entry['metadata']['timestamp']['nanos'], entry
- assert_equal 'WARNING', entry['metadata']['severity'], entry
+ assert_equal CONTAINER_SECONDS_EPOCH, entry['timestamp']['seconds'], entry
+ assert_equal CONTAINER_NANOS, entry['timestamp']['nanos'], entry
+ assert_equal 'WARNING', entry['severity'], entry
end
end
def test_cloudfunctions_log
setup_gce_metadata_stubs
@@ -1176,11 +1253,11 @@
@logs_sent = []
n.times { |i| d.emit(cloudfunctions_log_entry(i)) }
d.run
end
verify_log_entries(n, CLOUDFUNCTIONS_PARAMS) do |entry|
- assert_equal 'DEBUG', entry['metadata']['severity'],
+ assert_equal 'DEBUG', entry['severity'],
"Test with #{n} logs failed. \n#{entry}"
end
end
end
@@ -1197,11 +1274,11 @@
n.times { |i| d.emit(cloudfunctions_log_entry_text_not_matched(i)) }
d.run
end
verify_log_entries(
n, CLOUDFUNCTIONS_TEXT_NOT_MATCHED_PARAMS) do |entry|
- assert_equal 'INFO', entry['metadata']['severity'],
+ assert_equal 'INFO', entry['severity'],
"Test with #{n} logs failed. \n#{entry}"
end
end
end
@@ -1230,30 +1307,30 @@
def test_http_request_from_record
setup_gce_metadata_stubs
setup_logging_stubs do
d = create_driver
- d.emit('httpRequest' => http_request_message)
+ d.emit('httpRequest' => HTTP_REQUEST_MESSAGE)
d.run
end
verify_log_entries(1, COMPUTE_PARAMS, 'httpRequest') do |entry|
- assert_equal http_request_message, entry['httpRequest'], entry
- assert_nil get_fields(entry['structPayload'])['httpRequest'], entry
+ assert_equal HTTP_REQUEST_MESSAGE, entry['httpRequest'], entry
+ assert_nil get_fields(entry['jsonPayload'])['httpRequest'], entry
end
end
def test_http_request_partial_from_record
setup_gce_metadata_stubs
setup_logging_stubs do
d = create_driver
- d.emit('httpRequest' => http_request_message.merge(
+ d.emit('httpRequest' => HTTP_REQUEST_MESSAGE.merge(
'otherKey' => 'value'))
d.run
end
verify_log_entries(1, COMPUTE_PARAMS, 'httpRequest') do |entry|
- assert_equal http_request_message, entry['httpRequest'], entry
- fields = get_fields(entry['structPayload'])
+ assert_equal HTTP_REQUEST_MESSAGE, entry['httpRequest'], entry
+ fields = get_fields(entry['jsonPayload'])
request = get_fields(get_struct(fields['httpRequest']))
assert_equal 'value', get_string(request['otherKey']), entry
end
end
@@ -1262,24 +1339,19 @@
setup_logging_stubs do
d = create_driver
d.emit('httpRequest' => 'a_string')
d.run
end
- verify_log_entries(1, COMPUTE_PARAMS, 'structPayload') do |entry|
- fields = get_fields(entry['structPayload'])
+ verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry|
+ fields = get_fields(entry['jsonPayload'])
assert_equal 'a_string', get_string(fields['httpRequest']), entry
assert_nil entry['httpRequest'], entry
end
end
private
- def uri_for_log(params)
- 'https://logging.googleapis.com/v1beta3/projects/' + params[:project_id] +
- '/logs/' + params[:log_name] + '/entries:write'
- end
-
def stub_metadata_request(metadata_path, response_body)
stub_request(:get, 'http://169.254.169.254/computeMetadata/v1/' +
metadata_path)
.to_return(body: response_body, status: 200,
headers: { 'Content-Length' => response_body.length })
@@ -1419,62 +1491,80 @@
stream: 'stdout',
log: log_entry(i)
}
end
+ def dataflow_log_entry(i)
+ {
+ step: DATAFLOW_STEP_ID,
+ message: log_entry(i)
+ }
+ end
+
+ def ml_log_entry(i)
+ {
+ name: ML_LOG_AREA,
+ message: log_entry(i)
+ }
+ end
+
def log_entry(i)
'test log entry ' + i.to_s
end
- def check_labels(entry, common_labels, expected_labels)
- # TODO(salty) test/handle overlap between common_labels and entry labels
- all_labels ||= common_labels
- all_labels.merge!(entry['metadata']['labels'] || {})
- all_labels.each do |key, value|
+ def check_labels(labels, expected_labels)
+ labels.each do |key, value|
assert value.is_a?(String), "Value #{value} for label #{key} " \
'is not a string: ' + value.class.name
assert expected_labels.key?(key), "Unexpected label #{key} => #{value}"
assert_equal expected_labels[key], value, 'Value mismatch - expected ' \
"#{expected_labels[key]} in #{key} => #{value}"
end
- assert_equal expected_labels.length, all_labels.length, 'Expected ' \
- "#{expected_labels.length} labels, got #{all_labels.length}"
+ assert_equal expected_labels.length, labels.length, 'Expected ' \
+ "#{expected_labels.length} labels: #{expected_labels}, got " \
+ "#{labels.length} labels: #{labels}"
end
# The caller can optionally provide a block which is called for each entry.
def verify_json_log_entries(n, params, payload_type = 'textPayload')
i = 0
- @logs_sent.each do |batch|
- batch['entries'].each do |entry|
+ @logs_sent.each do |request|
+ request['entries'].each do |entry|
unless payload_type.empty?
assert entry.key?(payload_type), 'Entry did not contain expected ' \
"#{payload_type} key: " + entry.to_s
# Check the payload for textPayload, otherwise it's up to the caller.
if payload_type == 'textPayload'
- assert_equal "test log entry #{i}", entry['textPayload'], batch
+ assert_equal "test log entry #{i}", entry['textPayload'], request
end
end
- assert_equal params[:zone], entry['metadata']['zone']
- assert_equal params[:service_name], entry['metadata']['serviceName']
- check_labels entry, batch['commonLabels'], params[:labels]
+ # per-entry resource or log_name overrides the corresponding field
+ # from the request. Labels are merged, with the per-entry label
+ # taking precedence in case of overlap.
+ resource = entry['resource'] || request['resource']
+ log_name = entry['logName'] || request['logName']
+
+ labels ||= request['labels']
+ labels.merge!(entry['labels'] || {})
+
+ assert_equal \
+ "projects/#{params[:project_id]}/logs/#{params[:log_name]}", log_name
+ assert_equal params[:resource][:type], resource['type']
+ check_labels resource['labels'], params[:resource][:labels]
+ check_labels labels, params[:labels]
yield(entry) if block_given?
i += 1
assert i <= n, "Number of entries #{i} exceeds expected number #{n}"
end
end
assert i == n, "Number of entries #{i} does not match expected number #{n}"
end
- # The http request message to test against.
- def http_request_message
- HTTP_REQUEST_MESSAGE
- end
-
# Replace the 'referer' field with nil.
def http_request_message_with_nil_referer
- http_request_message.merge('referer' => nil)
+ HTTP_REQUEST_MESSAGE.merge('referer' => nil)
end
# This module expects the methods below to be overridden.
# Create a Fluentd output test driver with the Google Cloud Output plugin.
@@ -1501,11 +1591,11 @@
# a plain equal. e.g. assert_in_delta.
def assert_equal_with_default(_field, _expected_value, _default_value, _entry)
_undefined
end
- # Get the fields of the struct payload.
- def get_fields(_struct_payload)
+ # Get the fields of the payload.
+ def get_fields(_payload)
_undefined
end
# Get the value of a struct field.
def get_struct(_field)