test/plugin/base_test.rb in fluent-plugin-google-cloud-0.7.4 vs test/plugin/base_test.rb in fluent-plugin-google-cloud-0.7.5.pre.multiworkers

- old
+ new

@@ -624,11 +624,13 @@ %w(message log msg).each do |field| d.emit(PRESERVED_KEYS_MAP.merge(field => json_string)) end d.run end - verify_log_entries(3, COMPUTE_PARAMS, 'jsonPayload') do |entry| + expected_params = COMPUTE_PARAMS.merge( + labels: COMPUTE_PARAMS[:labels].merge(LABELS_MESSAGE)) + verify_log_entries(3, expected_params, 'jsonPayload') do |entry| fields = get_fields(entry['jsonPayload']) assert_equal 4, fields.size, entry assert_equal 'test log entry 0', get_string(fields['msg']), entry assert_equal 'test', get_string(fields['tag2']), entry assert_equal 5000, get_number(fields['data']), entry @@ -1247,61 +1249,91 @@ def test_log_entry_http_request_field_from_record verify_subfields_from_record(DEFAULT_HTTP_REQUEST_KEY) end - def test_log_entry_source_location_field_from_record - verify_subfields_from_record(DEFAULT_SOURCE_LOCATION_KEY) + def test_log_entry_labels_field_from_record + verify_subfields_from_record(DEFAULT_LABELS_KEY, false) end def test_log_entry_operation_field_from_record verify_subfields_from_record(DEFAULT_OPERATION_KEY) end + def test_log_entry_source_location_field_from_record + verify_subfields_from_record(DEFAULT_SOURCE_LOCATION_KEY) + end + # Verify the subfields extraction of LogEntry fields when there are other # fields. def test_log_entry_http_request_field_partial_from_record verify_subfields_partial_from_record(DEFAULT_HTTP_REQUEST_KEY) end - def test_log_entry_source_location_field_partial_from_record - verify_subfields_partial_from_record(DEFAULT_SOURCE_LOCATION_KEY) - end + # We don't need a test like 'test_log_entry_labels_field_partial_from_record' + # because labels are free range strings. Everything in the labels field should + # be in the resulting logEntry->labels field. There is no need to check + # partial transformation (aka, some 'labels' fields are extracted, while + # others are left as it is). def test_log_entry_operation_field_partial_from_record verify_subfields_partial_from_record(DEFAULT_OPERATION_KEY) end + def test_log_entry_source_location_field_partial_from_record + verify_subfields_partial_from_record(DEFAULT_SOURCE_LOCATION_KEY) + end + # Verify the subfields extraction of LogEntry fields when they are not hashes. def test_log_entry_http_request_field_when_not_hash - verify_subfields_when_not_hash(DEFAULT_HTTP_REQUEST_KEY) + # TODO(qingling128) On the next major after 0.7.4, make all logEntry + # subfields behave the same way: if the field is not in the correct format, + # log an error in the Fluentd log and remove this field from payload. This + # is the preferred behavior per PM decision. + verify_subfields_untouched_when_not_hash(DEFAULT_HTTP_REQUEST_KEY) end - def test_log_entry_source_location_field_when_not_hash - verify_subfields_when_not_hash(DEFAULT_SOURCE_LOCATION_KEY) + def test_log_entry_labels_field_when_not_hash + verify_subfields_removed_when_not_hash(DEFAULT_LABELS_KEY) end def test_log_entry_operation_field_when_not_hash - verify_subfields_when_not_hash(DEFAULT_OPERATION_KEY) + # TODO(qingling128) On the next major after 0.7.4, make all logEntry + # subfields behave the same way: if the field is not in the correct format, + # log an error in the Fluentd log and remove this field from payload. This + # is the preferred behavior per PM decision. + verify_subfields_untouched_when_not_hash(DEFAULT_OPERATION_KEY) end + def test_log_entry_source_location_field_when_not_hash + # TODO(qingling128) On the next major after 0.7.4, make all logEntry + # subfields behave the same way: if the field is not in the correct format, + # log an error in the Fluentd log and remove this field from payload. This + # is the preferred behavior per PM decision. + verify_subfields_untouched_when_not_hash(DEFAULT_SOURCE_LOCATION_KEY) + end + # Verify the subfields extraction of LogEntry fields when they are nil. def test_log_entry_http_request_field_when_nil verify_subfields_when_nil(DEFAULT_HTTP_REQUEST_KEY) end - def test_log_entry_source_location_field_when_nil - verify_subfields_when_nil(DEFAULT_SOURCE_LOCATION_KEY) + def test_log_entry_labels_field_when_nil + verify_subfields_when_nil(DEFAULT_LABELS_KEY) end def test_log_entry_operation_field_when_nil verify_subfields_when_nil(DEFAULT_OPERATION_KEY) end + def test_log_entry_source_location_field_when_nil + verify_subfields_when_nil(DEFAULT_SOURCE_LOCATION_KEY) + end + def test_http_request_from_record_with_referer_nil_or_absent setup_gce_metadata_stubs [ http_request_message_with_nil_referer, http_request_message_with_absent_referer @@ -1358,34 +1390,58 @@ end # Verify the default and customization of LogEntry field extraction key. def test_log_entry_insert_id_field - verify_field_key('insertId', DEFAULT_INSERT_ID_KEY, 'custom_insert_id_key', - CONFIG_CUSTOM_INSERT_ID_KEY_SPECIFIED, INSERT_ID) + verify_field_key('insertId', + default_key: DEFAULT_INSERT_ID_KEY, + custom_key: 'custom_insert_id_key', + custom_key_config: CONFIG_CUSTOM_INSERT_ID_KEY_SPECIFIED, + sample_value: INSERT_ID) end + def test_log_entry_labels_field + verify_field_key('labels', + default_key: DEFAULT_LABELS_KEY, + custom_key: 'custom_labels_key', + custom_key_config: CONFIG_CUSTOM_LABELS_KEY_SPECIFIED, + sample_value: COMPUTE_PARAMS[:labels].merge( + LABELS_MESSAGE), + default_value: COMPUTE_PARAMS[:labels]) + end + def test_log_entry_operation_field - verify_field_key('operation', DEFAULT_OPERATION_KEY, 'custom_operation_key', - CONFIG_CUSTOM_OPERATION_KEY_SPECIFIED, OPERATION_MESSAGE) + verify_field_key('operation', + default_key: DEFAULT_OPERATION_KEY, + custom_key: 'custom_operation_key', + custom_key_config: CONFIG_CUSTOM_OPERATION_KEY_SPECIFIED, + sample_value: OPERATION_MESSAGE) end def test_log_entry_source_location_field - verify_field_key('sourceLocation', DEFAULT_SOURCE_LOCATION_KEY, - 'custom_source_location_key', - CONFIG_CUSTOM_SOURCE_LOCATION_KEY_SPECIFIED, - source_location_message) + verify_field_key('sourceLocation', + default_key: DEFAULT_SOURCE_LOCATION_KEY, + custom_key: 'custom_source_location_key', + custom_key_config: \ + CONFIG_CUSTOM_SOURCE_LOCATION_KEY_SPECIFIED, + sample_value: source_location_message) end def test_log_entry_span_id_field - verify_field_key('spanId', DEFAULT_SPAN_ID_KEY, 'custom_span_id_key', - CONFIG_CUSTOM_SPAN_ID_KEY_SPECIFIED, SPAN_ID) + verify_field_key('spanId', + default_key: DEFAULT_SPAN_ID_KEY, + custom_key: 'custom_span_id_key', + custom_key_config: CONFIG_CUSTOM_SPAN_ID_KEY_SPECIFIED, + sample_value: SPAN_ID) end def test_log_entry_trace_field - verify_field_key('trace', DEFAULT_TRACE_KEY, 'custom_trace_key', - CONFIG_CUSTOM_TRACE_KEY_SPECIFIED, TRACE) + verify_field_key('trace', + default_key: DEFAULT_TRACE_KEY, + custom_key: 'custom_trace_key', + custom_key_config: CONFIG_CUSTOM_TRACE_KEY_SPECIFIED, + sample_value: TRACE) end # Verify the cascading JSON detection of LogEntry fields. def test_cascading_json_detection_with_log_entry_insert_id_field @@ -1393,10 +1449,20 @@ 'insertId', DEFAULT_INSERT_ID_KEY, root_level_value: INSERT_ID, nested_level_value: INSERT_ID2) end + def test_cascading_json_detection_with_log_entry_labels_field + verify_cascading_json_detection_with_log_entry_fields( + 'labels', DEFAULT_LABELS_KEY, + root_level_value: LABELS_MESSAGE, + nested_level_value: LABELS_MESSAGE2, + expected_value_from_root: COMPUTE_PARAMS[:labels].merge(LABELS_MESSAGE), + expected_value_from_nested: COMPUTE_PARAMS[:labels].merge( + LABELS_MESSAGE2)) + end + def test_cascading_json_detection_with_log_entry_operation_field verify_cascading_json_detection_with_log_entry_fields( 'operation', DEFAULT_OPERATION_KEY, root_level_value: OPERATION_MESSAGE, nested_level_value: OPERATION_MESSAGE2, @@ -1422,10 +1488,86 @@ 'trace', DEFAULT_TRACE_KEY, root_level_value: TRACE, nested_level_value: TRACE2) end + # Verify that labels present in multiple inputs respect the expected priority + # order: + # 1. Labels from the field "logging.googleapis.com/labels" in payload. + # 2. Labels from the config "label_map". + # 3. Labels from the config "labels". + def test_labels_order + [ + # Labels from the config "labels". + { + config: CONFIG_LABELS, + emitted_log: {}, + expected_labels: LABELS_FROM_LABELS_CONFIG + }, + # Labels from the config "label_map". + { + config: CONFIG_LABEL_MAP, + emitted_log: PAYLOAD_FOR_LABEL_MAP, + expected_labels: LABELS_FROM_LABEL_MAP_CONFIG + }, + # Labels from the field "logging.googleapis.com/labels" in payload. + { + config: APPLICATION_DEFAULT_CONFIG, + emitted_log: { DEFAULT_LABELS_KEY => LABELS_MESSAGE }, + expected_labels: LABELS_MESSAGE + }, + # All three types of labels that do not conflict. + { + config: CONFIG_LABLES_AND_LABLE_MAP, + emitted_log: PAYLOAD_FOR_LABEL_MAP.merge( + DEFAULT_LABELS_KEY => LABELS_MESSAGE), + expected_labels: LABELS_MESSAGE.merge(LABELS_FROM_LABELS_CONFIG).merge( + LABELS_FROM_LABEL_MAP_CONFIG) + }, + # labels from the config "labels" and "label_map" conflict. + { + config: CONFIG_LABLES_AND_LABLE_MAP_CONFLICTING, + emitted_log: PAYLOAD_FOR_LABEL_MAP_CONFLICTING, + expected_labels: LABELS_FROM_LABEL_MAP_CONFIG_CONFLICTING + }, + # labels from the config "labels" and labels from the field + # "logging.googleapis.com/labels" in payload conflict. + { + config: CONFIG_LABELS_CONFLICTING, + emitted_log: { DEFAULT_LABELS_KEY => LABELS_FROM_PAYLOAD_CONFLICTING }, + expected_labels: LABELS_FROM_PAYLOAD_CONFLICTING + }, + # labels from the config "label_map" and labels from the field + # "logging.googleapis.com/labels" in payload conflict. + { + config: CONFIG_LABEL_MAP_CONFLICTING, + emitted_log: PAYLOAD_FOR_LABEL_MAP_CONFLICTING.merge( + DEFAULT_LABELS_KEY => LABELS_FROM_PAYLOAD_CONFLICTING), + expected_labels: LABELS_FROM_PAYLOAD_CONFLICTING + }, + # All three types of labels conflict. + { + config: CONFIG_LABLES_AND_LABLE_MAP_CONFLICTING, + emitted_log: PAYLOAD_FOR_LABEL_MAP_CONFLICTING.merge( + DEFAULT_LABELS_KEY => LABELS_FROM_PAYLOAD_CONFLICTING), + expected_labels: LABELS_FROM_PAYLOAD_CONFLICTING + } + ].each do |test_params| + new_stub_context do + setup_gce_metadata_stubs + setup_logging_stubs do + d = create_driver(test_params[:config]) + d.emit({ 'message' => log_entry(0) }.merge(test_params[:emitted_log])) + d.run + end + expected_params = COMPUTE_PARAMS.merge( + labels: COMPUTE_PARAMS[:labels].merge(test_params[:expected_labels])) + verify_log_entries(1, expected_params) + end + end + end + # Metadata Agent related tests. # Test enable_metadata_agent not set or set to false. def test_configure_enable_metadata_agent_default_and_false setup_gce_metadata_stubs @@ -1646,10 +1788,70 @@ end end end end + # Test k8s_pod monitored resource including the fallback when Metadata Agent + # restarts. + def test_k8s_pod_monitored_resource_fallback + [ + { + config: APPLICATION_DEFAULT_CONFIG, + setup_metadata_agent_stub: true, + setup_k8s_stub: true, + log_entry: k8s_pod_log_entry(log_entry(0)), + expected_params: K8S_POD_PARAMS_FROM_LOCAL + }, + { + config: ENABLE_METADATA_AGENT_CONFIG, + setup_metadata_agent_stub: false, + setup_k8s_stub: true, + log_entry: k8s_pod_log_entry(log_entry(0)), + expected_params: K8S_POD_PARAMS_FROM_LOCAL + }, + { + config: CUSTOM_K8S_ENABLE_METADATA_AGENT_CONFIG, + setup_metadata_agent_stub: false, + setup_k8s_stub: false, + log_entry: k8s_pod_log_entry(log_entry(0)), + expected_params: K8S_POD_PARAMS_CUSTOM + }, + { + config: EMPTY_K8S_ENABLE_METADATA_AGENT_CONFIG, + setup_metadata_agent_stub: true, + setup_k8s_stub: true, + log_entry: k8s_pod_log_entry(log_entry(0)), + expected_params: K8S_POD_PARAMS + }, + { + config: ENABLE_METADATA_AGENT_CONFIG, + setup_metadata_agent_stub: true, + setup_k8s_stub: true, + log_entry: k8s_pod_log_entry(log_entry(0)), + expected_params: K8S_POD_PARAMS + } + ].each do |test_params| + new_stub_context do + setup_gce_metadata_stubs + setup_metadata_agent_stubs(test_params[:setup_metadata_agent_stub]) + setup_k8s_metadata_stubs(test_params[:setup_k8s_stub]) + setup_logging_stubs do + d = create_driver(test_params[:config]) + d.emit(test_params[:log_entry]) + d.run + end + verify_log_entries(1, test_params[:expected_params], + 'jsonPayload') do |entry| + fields = get_fields(entry['jsonPayload']) + assert_equal 2, fields.size, entry + assert_equal 'test log entry 0', get_string(fields['log']), entry + assert_equal K8S_STREAM, get_string(fields['stream']), entry + end + end + end + end + # Test k8s_node monitored resource including the fallback when Metadata Agent # restarts. def test_k8s_node_monitored_resource_fallback [ { @@ -2021,10 +2223,22 @@ time: K8S_TIMESTAMP, LOCAL_RESOURCE_ID_KEY => local_resource_id } end + def k8s_pod_log_entry(log) + { + log: log, + stream: K8S_STREAM, + time: K8S_TIMESTAMP, + LOCAL_RESOURCE_ID_KEY => + "#{K8S_POD_LOCAL_RESOURCE_ID_PREFIX}" \ + ".#{K8S_NAMESPACE_NAME}" \ + ".#{K8S_POD_NAME}" + } + end + def k8s_node_log_entry(log) { log: log, stream: K8S_STREAM, time: K8S_TIMESTAMP, @@ -2080,31 +2294,39 @@ def log_entry(i) "test log entry #{i}" end - def check_labels(labels, expected_labels) - return if labels.empty? && expected_labels.empty? - labels.each do |key, value| - assert value.is_a?(String), "Value #{value} for label #{key} " \ - 'is not a string: ' + value.class.name - assert expected_labels.key?(key), "Unexpected label #{key} => #{value}" - assert_equal expected_labels[key], value, 'Value mismatch - expected ' \ - "#{expected_labels[key]} in #{key} => #{value}" + # If check_exact_labels is true, assert 'labels' and 'expected_labels' match + # exactly. If check_exact_labels is false, assert 'labels' is a subset of + # 'expected_labels'. + def check_labels(expected_labels, labels, check_exact_labels = true) + return if expected_labels.empty? && labels.empty? + expected_labels.each do |expected_key, expected_value| + assert labels.key?(expected_key), "Expected label #{expected_key} not" \ + " found. Got labels: #{labels}." + actual_value = labels[expected_key] + assert actual_value.is_a?(String), 'Value for label' \ + " #{expected_key} is not a string: #{actual_value}." + assert_equal expected_value, actual_value, "Value for #{expected_key}" \ + " mismatch. Expected #{expected_value}. Got #{actual_value}" end - assert_equal expected_labels.length, labels.length, 'Expected ' \ - "#{expected_labels.length} labels: #{expected_labels}, got " \ - "#{labels.length} labels: #{labels}" + if check_exact_labels + assert_equal expected_labels.length, labels.length, 'Expected ' \ + "#{expected_labels.length} labels: #{expected_labels}, got " \ + "#{labels.length} labels: #{labels}" + end end def verify_default_log_entry_text(text, i, entry) assert_equal "test log entry #{i}", text, "Entry ##{i} had unexpected text: #{entry}" end # The caller can optionally provide a block which is called for each entry. - def verify_json_log_entries(n, params, payload_type = 'textPayload') + def verify_json_log_entries(n, params, payload_type = 'textPayload', + check_exact_entry_labels = true) entry_count = 0 @logs_sent.each do |request| request['entries'].each do |entry| unless payload_type.empty? assert entry.key?(payload_type), @@ -2125,12 +2347,14 @@ assert_equal \ "projects/#{params[:project_id]}/logs/#{params[:log_name]}", log_name end assert_equal params[:resource][:type], resource['type'] - check_labels resource['labels'], params[:resource][:labels] - check_labels labels, params[:labels] + check_labels params[:resource][:labels], resource['labels'] + + check_labels params[:labels], labels, check_exact_entry_labels + if block_given? yield(entry, entry_count) elsif payload_type == 'textPayload' # Check the payload for textPayload, otherwise it's up to the caller. verify_default_log_entry_text(entry['textPayload'], entry_count, @@ -2168,27 +2392,30 @@ # The keys are the names of fields in the payload that we are extracting # LogEntry info from. The values are lists of two elements: the name of # the subfield in LogEntry object and the expected value of that field. DEFAULT_HTTP_REQUEST_KEY => [ 'httpRequest', http_request_message], - DEFAULT_SOURCE_LOCATION_KEY => [ - 'sourceLocation', source_location_message], + DEFAULT_LABELS_KEY => [ + 'labels', COMPUTE_PARAMS[:labels].merge(LABELS_MESSAGE)], DEFAULT_OPERATION_KEY => [ - 'operation', OPERATION_MESSAGE] + 'operation', OPERATION_MESSAGE], + DEFAULT_SOURCE_LOCATION_KEY => [ + 'sourceLocation', source_location_message] } end - def verify_subfields_from_record(payload_key) + def verify_subfields_from_record(payload_key, check_exact_entry_labels = true) destination_key, payload_value = log_entry_subfields_params[payload_key] @logs_sent = [] setup_gce_metadata_stubs setup_logging_stubs do d = create_driver d.emit(payload_key => payload_value) d.run end - verify_log_entries(1, COMPUTE_PARAMS, destination_key) do |entry| + verify_log_entries(1, COMPUTE_PARAMS, destination_key, + check_exact_entry_labels) do |entry| assert_equal payload_value, entry[destination_key], entry fields = get_fields(entry['jsonPayload']) assert_nil fields[payload_key], entry end end @@ -2208,20 +2435,39 @@ request = get_fields(get_struct(fields[payload_key])) assert_equal 'value', get_string(request['otherKey']), entry end end - def verify_subfields_when_not_hash(payload_key) + def verify_subfields_removed_when_not_hash(payload_key) destination_key = log_entry_subfields_params[payload_key][0] @logs_sent = [] setup_gce_metadata_stubs setup_logging_stubs do d = create_driver d.emit(payload_key => 'a_string') d.run end verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + # The malformed field has been removed from the payload. + assert_true get_fields(entry['jsonPayload']).empty?, entry + # No additional labels. + assert_equal COMPUTE_PARAMS[:labels].size, + entry[destination_key].size, entry + end + end + + def verify_subfields_untouched_when_not_hash(payload_key) + destination_key = log_entry_subfields_params[payload_key][0] + @logs_sent = [] + setup_gce_metadata_stubs + setup_logging_stubs do + d = create_driver + d.emit(payload_key => 'a_string') + d.run + end + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + # Verify that we leave the malformed field as it is. field = get_fields(entry['jsonPayload'])[payload_key] assert_equal 'a_string', get_string(field), entry assert_false entry.key?(destination_key), entry end end @@ -2237,11 +2483,17 @@ end verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| fields = get_fields(entry['jsonPayload']) assert_false fields.key?(payload_key), entry - assert_false entry.key?(destination_key), entry + if payload_key == DEFAULT_LABELS_KEY + # No additional labels. + assert_equal COMPUTE_PARAMS[:labels].size, + entry[destination_key].size, entry + else + assert_false entry.key?(destination_key), entry + end end end # Cascading JSON detection is only triggered when the record has one field # left with name "log", "message" or "msg". This test verifies additional @@ -2300,11 +2552,11 @@ @logs_sent = [] d = create_driver(DETECT_JSON_CONFIG) d.emit(input_log_entry) d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload', false) do |entry| assert_equal expected_value, entry[log_entry_field], "Index #{index} failed. #{expected_value} is expected" \ " for #{log_entry_field} field." payload_fields = get_fields(entry['jsonPayload']) assert_equal structured_log_entry.size, payload_fields.size @@ -2313,21 +2565,26 @@ end end end end - def verify_field_key(log_entry_field, default_key, custom_key, - custom_key_config, sample_value) + def verify_field_key(log_entry_field, test_params) + default_key = test_params[:default_key] + custom_key = test_params[:custom_key] + custom_key_config = test_params[:custom_key_config] + sample_value = test_params[:sample_value] + default_value = test_params.fetch(:default_value, nil) + setup_gce_metadata_stubs message = log_entry(0) [ { # It leaves log entry field nil if no keyed value sent. driver_config: APPLICATION_DEFAULT_CONFIG, emitted_log: { 'msg' => message }, expected_payload: { 'msg' => message }, - expected_field_value: nil + expected_field_value: default_value }, { # By default, it sets log entry field via a default key. driver_config: APPLICATION_DEFAULT_CONFIG, emitted_log: { 'msg' => message, default_key => sample_value }, @@ -2344,20 +2601,20 @@ { # It doesn't set log entry field by default key if custom key specified. driver_config: custom_key_config, emitted_log: { 'msg' => message, default_key => sample_value }, expected_payload: { 'msg' => message, default_key => sample_value }, - expected_field_value: nil + expected_field_value: default_value } ].each do |input| setup_logging_stubs do @logs_sent = [] d = create_driver(input[:driver_config]) d.emit(input[:emitted_log]) d.run end - verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload') do |entry| + verify_log_entries(1, COMPUTE_PARAMS, 'jsonPayload', false) do |entry| assert_equal input[:expected_field_value], entry[log_entry_field], input payload_fields = get_fields(entry['jsonPayload']) assert_equal input[:expected_payload].size, payload_fields.size, input payload_fields.each do |key, value| assert_hash_equal_json(input[:expected_payload][key], value) @@ -2411,10 +2668,11 @@ _undefined end # Verify the number and the content of the log entries match the expectation. # The caller can optionally provide a block which is called for each entry. - def verify_log_entries(_n, _params, _payload_type = 'textPayload', &_block) + def verify_log_entries(_n, _params, _payload_type = 'textPayload', + _check_exact_entry_labels = true, &_block) _undefined end # For an optional field with default values, Protobuf omits the field when it # is deserialized to json. So we need to add an extra check for gRPC which