# encoding: utf-8 require "logstash/codecs/base" require "logstash/namespace" require "logstash/codecs/json" require 'digest' class LogStash::Codecs::GoogleAppengine < LogStash::Codecs::Base config_name "google_appengine" public def register @json = LogStash::Codecs::JSON.new @md5 = Digest::MD5.new end def decode(data) begin @json.decode data do |json| if is_parse_failure json @logger.error("Failed to process data", :data => json) else flatten(json).each { |flattenedJson| yield LogStash::Event.new flattenedJson } end end rescue => e @logger.error "Failed to process data", :error => e, :data => data end end end private def is_parse_failure(event) event["tags"] && event["tags"].include?("_jsonparsefailure") end def flatten(event) payload = event['protoPayload'] lines = payload['line'] payload.delete '@type' if lines payload.delete 'line' lines.map.with_index { |line, i| merged = payload.merge line merged['id'] = @md5.hexdigest merged['requestId'] + i.to_s merged['message'] = merged.delete 'logMessage' merged } else payload['id'] = @md5.hexdigest payload['requestId'] payload['time'] = payload['endTime'] [payload] end end