module Fluent module EC2Metadata def initialize super require 'net/http' require 'aws-sdk-ec2' require 'oj' end def configure(conf) super # directive @map = {} conf.elements.select { |element| element.name == 'record' }.each { |element| element.each_pair { |k, v| element.has_key?(k) # to suppress unread configuration warning @map[k] = v } } @placeholder_expander = PlaceholderExpander.new(log) # get metadata first and then setup a refresh thread @ec2_metadata = get_metadata_and_tags @refresh_thread = Thread.new { while true sleep @metadata_refresh_seconds @ec2_metadata = get_metadata_and_tags end } end private def get_metadata_and_tags metadata = {} set_metadata(metadata) set_tag(metadata) metadata end def set_metadata(ec2_metadata) instance_identity = Oj.load(get_dynamic_data("instance-identity/document")) ec2_metadata['account_id'] = instance_identity["accountId"] ec2_metadata['image_id'] = instance_identity["imageId"] ec2_metadata['instance_id'] = get_metadata('instance-id') ec2_metadata['instance_type'] = get_metadata('instance-type') ec2_metadata['availability_zone'] = get_metadata('placement/availability-zone') ec2_metadata['region'] = ec2_metadata['availability_zone'].chop ec2_metadata['private_ip'] = get_metadata('local-ipv4') ec2_metadata['mac'] = get_metadata('mac') begin ec2_metadata['vpc_id'] = get_metadata("network/interfaces/macs/#{ec2_metadata['mac']}/vpc-id") rescue ec2_metadata['vpc_id'] = nil log.info "ec2-metadata: 'vpc_id' is undefined #{ec2_metadata['instance_id']} is not in VPC}" end begin ec2_metadata['subnet_id'] = get_metadata("network/interfaces/macs/#{ec2_metadata['mac']}/subnet-id") rescue ec2_metadata['subnet_id'] = nil log.info "ec2-metadata: 'subnet_id' is undefined because #{ec2_metadata['instance_id']} is not in VPC}" end ec2_metadata end def get_dynamic_data(f) Net::HTTP.start('169.254.169.254') do |http| res = http.get("/latest/dynamic/#{f}", get_header()) raise Fluent::ConfigError, "ec2-dynamic-data: failed to get #{f}" unless res.is_a?(Net::HTTPSuccess) res.body end end def get_metadata(f) Net::HTTP.start('169.254.169.254') do |http| res = http.get("/latest/meta-data/#{f}", get_header()) raise Fluent::ConfigError, "ec2-metadata: failed to get #{f}" unless res.is_a?(Net::HTTPSuccess) res.body end end def get_header() if @imdsv2 Net::HTTP.start('169.254.169.254') do |http| res = http.put("/latest/api/token", '', { 'X-aws-ec2-metadata-token-ttl-seconds' => '300' }) raise Fluent::ConfigError, "ec2-metadata: failed to get token" unless res.is_a?(Net::HTTPSuccess) { 'X-aws-ec2-metadata-token' => res.body } end else {} end end def set_tag(ec2_metadata) if @map.values.any? { |v| v.match(/^\${tagset_/) } || @output_tag =~ /\${tagset_/ if @aws_key_id and @aws_sec_key ec2 = Aws::EC2::Client.new( region: ec2_metadata['region'], access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) else ec2 = Aws::EC2::Client.new( region: ec2_metadata['region'], ) end response = ec2.describe_instances({ :instance_ids => [ec2_metadata['instance_id']] }) instance = response.reservations[0].instances[0] raise Fluent::ConfigError, "ec2-metadata: failed to get instance data #{response.pretty_inspect}" if instance.nil? instance.tags.each { |tag| ec2_metadata["tagset_#{tag.key.downcase}"] = tag.value } end end def modify_record(record, tag, tag_parts) placeholders = @placeholder_expander.prepare_placeholders(record, tag, tag_parts, @ec2_metadata) new_record = record.dup @map.each_pair { |k, v| new_record[k] = @placeholder_expander.expand(v, placeholders) } new_record end def modify(output_tag, record, tag, tag_parts) placeholders = @placeholder_expander.prepare_placeholders(record, tag, tag_parts, @ec2_metadata) new_tag = @placeholder_expander.expand(output_tag, placeholders) new_record = record.dup @map.each_pair { |k, v| new_record[k] = @placeholder_expander.expand(v, placeholders) } [new_tag, new_record] end class PlaceholderExpander def initialize(log) @log = log end # referenced https://github.com/fluent/fluent-plugin-rewrite-tag-filter # referenced https://github.com/sonots/fluent-plugin-record-reformer attr_reader :placeholders def prepare_placeholders(_record, tag, tag_parts, ec2_metadata) placeholders = { '${tag}' => tag, } size = tag_parts.size tag_parts.each_with_index { |t, idx| placeholders.store("${tag_parts[#{idx}]}", t) placeholders.store("${tag_parts[#{idx-size}]}", t) # support tag_parts[-1] } ec2_metadata.each { |k, v| placeholders.store("${#{k}}", v) } placeholders end def expand(str, placeholders) str.gsub(/(\${[a-z_:\-]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) { @log.warn "ec2-metadata: unknown placeholder `#{$1}` found in a tag `#{placeholders['${tag}']}`" unless placeholders.include?($1) placeholders[$1] } end end end end