module Fluent class HttpRecordModifier < Filter Plugin.register_filter('http_record_modifier', self) def initialize require 'socket' require 'yajl' require 'net/http' require 'uri' super end #Based on Filter Record Tranformer that is built-in on Fluent config_param :remove_keys, :string, :default => nil config_param :keep_keys, :string, :default => nil config_param :method, :string, :default => :get config_param :rene_record, :bool, :default => false config_param :auto_typecast, :bool, :default => false # false for lower version compatibility config_param :endpoint_url, :string config_param :serializer, :string, :default => :form config_param :authentication, :string, :default => nil config_param :username, :string, :default => '' config_param :password, :string, :default => '' config_param :raise_on_error, :bool, :default => true config_param :cache, :bool, :default => true config_param :expire, :integer, :default => 600 #10 min config_param :renew_time_key, :string, :default => nil config_param :remove_time_key, :bool, :default => true def configure(conf) super @method ||= conf['method'] @map = {} # directive conf.elements.select { |element| element.name == 'record' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # to suppress unread configuration warning @map[k] = parse_value(v) end end @maped_params = {} # directive conf.elements.select { |element| element.name == 'params' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # to suppress unread configuration warning @maped_params[k] = parse_value(v) end end if @remove_keys @remove_keys = @remove_keys.split(',') end if @keep_keys raise Fluent::ConfigError, "`renew_record` must be true to use `keep_keys`" unless @renew_record @keep_keys = @keep_keys.split(',') end @request_cache = Cache.new(@cache, @expire) @placeholder_expander = PlaceholderExpander.new({ :log => log, :auto_typecast => @auto_typecast, }) @hostname = Socket.gethostname end def filter_stream(tag, es) new_es = MultiEventStream.new tag_parts = tag.split('.') tag_prefix = tag_prefix(tag_parts) tag_suffix = tag_suffix(tag_parts) placeholders = { 'tag' => tag, 'tag_parts' => tag_parts, 'tag_prefix' => tag_prefix, 'tag_suffix' => tag_suffix, 'hostname' => @hostname, } last_record = nil es.each do |time, record| last_record = record # for debug log req, uri = create_request(tag, time, record) body = @request_cache.get(uri.to_s) if body.nil? res = send_request(req, uri) body = deserialize_body(res) @request_cache.set(uri.to_s, body) end new_record = reform(time, record, placeholders, body) if @renew_time_key && new_record.has_key?(@renew_time_key) time = new_record[@renew_time_key].to_i if @remove_time_key new_record.delete(@renew_time_key) end end new_es.add(time, new_record) end new_es rescue => e log.warn "failed to reform records", :error_class => e.class, :error => e.message log.warn_backtrace log.debug "map:#{@map} record:#{last_record} placeholders:#{placeholders}" end def deserialize_body(res) clean = {} body = res.body if res.content_type == 'application/json' body = Yajl.load(body) if body.is_a? Hash clean = Yajl.load(res.body) end end clean['body'] = body clean end def format_params(tag, time, record) tag_parts = tag.split('.') tag_prefix = tag_prefix(tag_parts) tag_suffix = tag_suffix(tag_parts) placeholders = { 'tag' => tag, 'tag_parts' => tag_parts, 'tag_prefix' => tag_prefix, 'tag_suffix' => tag_suffix, 'hostname' => @hostname, } @placeholder_expander.prepare_placeholders(time, record, placeholders) expand_placeholders(@maped_params) end def set_body(req, tag, time, record) if @serializer == :json req['Content-Type'] = 'application/json' else req.set_form_data(record) end req end def create_request(tag, time, record) url = URI.encode(@endpoint_url.to_s) uri = URI.parse(url) params = format_params(tag, time, record) uri.query = URI.encode_www_form(params) req = Net::HTTP.const_get(@method.to_s.capitalize).new(uri) unless @method.to_s.capitalize == 'Get' set_body(req, tag, time, record) end return req, uri end def send_request(req, uri) res = nil begin if @auth and @auth == :basic req.basic_auth(@username, @password) end res = Net::HTTP.new(uri.host, uri.port).start {|http| http.request(req) } rescue => e # rescue all StandardErrors # server didn't respond $log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'" raise e if @raise_on_error end # end begin end # end send_request private def parse_value(value_str) if value_str.start_with?('{', '[') JSON.parse(value_str) else value_str end rescue => e log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", :error_class => e.class, :error => e.message value_str # emit as string end def reform(time, record, opts, res) @placeholder_expander.prepare_placeholders(time, res, opts) new_record = @renew_record ? {} : record.dup @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record new_record.merge!(expand_placeholders(@map)) @remove_keys.each {|k| new_record.delete(k) } if @remove_keys new_record end def expand_placeholders(value) if value.is_a?(String) new_value = @placeholder_expander.expand(value) elsif value.is_a?(Hash) new_value = {} value.each_pair do |k, v| new_value[@placeholder_expander.expand(k, true)] = expand_placeholders(v) end elsif value.is_a?(Array) new_value = [] value.each_with_index do |v, i| new_value[i] = expand_placeholders(v) end else new_value = value end new_value end def tag_prefix(tag_parts) return [] if tag_parts.empty? tag_prefix = [tag_parts.first] 1.upto(tag_parts.size-1).each do |i| tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}" end tag_prefix end def tag_suffix(tag_parts) return [] if tag_parts.empty? rev_tag_parts = tag_parts.reverse rev_tag_suffix = [rev_tag_parts.first] 1.upto(tag_parts.size-1).each do |i| rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}" end rev_tag_suffix.reverse! end class Cache def initialize(cache, expire) @data = {} @cache = cache @expire = expire end def get(key) unless @data.has_key?(key) and @cache return nil end if Time.now.to_i > @data[key]['time'] + @expire @data.delete(key) return nil end return @data[key]['value'] end def set(key, value) if @cache @data[key] = { 'time' => Time.now.to_i, 'value' => value } end end end class PlaceholderExpander attr_reader :placeholders, :log def initialize(params) @log = params[:log] @auto_typecast = params[:auto_typecast] end def prepare_placeholders(time, record, opts) placeholders = { '${time}' => Time.at(time).to_s } record.each {|key, value| crawl_placeholder(value, placeholders, "#{key}")} opts.each do |key, value| if value.kind_of?(Array) # tag_parts, etc size = value.size value.each_with_index { |v, idx| placeholders.store("${#{key}[#{idx}]}", v) placeholders.store("${#{key}[#{idx-size}]}", v) # support [-1] } else # string, interger, float, and others? placeholders.store("${#{key}}", value) end end @placeholders = placeholders end def crawl_placeholder (value, placeholder, before, limit = 50) if limit >= 0 if value.kind_of?(Hash) value.each {|key, v| crawl_placeholder(v, placeholder, "#{before}.#{key}", limit - 1)} elsif value.kind_of?(Array) # tag_parts, etc size = value.size value.each_with_index { |v, idx| crawl_placeholder(v, placeholder, "#{before}[#{idx}]", limit - 1) crawl_placeholder(v, placeholder, "#{before}[#{idx-size}]", limit - 1) #suport [-1] } end end # string, interger, float, and others? placeholder.store("${#{before}}", value) end def expand(str, force_stringify=false) if @auto_typecast and !force_stringify single_placeholder_matched = str.match(/\A(\${[^}]+}|__[A-Z_]+__)\z/) if single_placeholder_matched log_unknown_placeholder($1) return @placeholders[single_placeholder_matched[1]] end end str.gsub(/(\${[^}]+}|__[A-Z_]+__)/) { log_unknown_placeholder($1) @placeholders[$1] } end private def log_unknown_placeholder(placeholder) unless @placeholders.include?(placeholder) log.warn "unknown placeholder `#{placeholder}` found" end end end end end