require 'fluent_plugin_elb_access_log/version' class Fluent::ElbAccessLogInput < Fluent::Input Fluent::Plugin.register_input('elb_access_log', self) USER_AGENT_SUFFIX = "fluent-plugin-elb-access-log/#{FluentPluginElbAccessLog::VERSION}" # http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html#access-log-entry-format ACCESS_LOG_FIELDS = { 'timestamp' => nil, 'elb' => nil, 'client_port' => nil, 'backend_port' => nil, 'request_processing_time' => :to_f, 'backend_processing_time' => :to_f, 'response_processing_time' => :to_f, 'elb_status_code' => :to_i, 'backend_status_code' => :to_i, 'received_bytes' => :to_i, 'sent_bytes' => :to_i, 'request' => nil, } unless method_defined?(:log) define_method('log') { $log } end unless method_defined?(:router) define_method('router') { Fluent::Engine } end config_param :aws_key_id, :string, :default => nil config_param :aws_sec_key, :string, :default => nil config_param :profile, :string, :default => nil config_param :credentials_path, :string, :default => nil config_param :http_proxy, :string, :default => nil config_param :account_id, :string config_param :region, :string config_param :s3_bucket, :string config_param :s3_prefix, :string, :default => nil config_param :tag, :string, :default => 'elb.access_log' config_param :tsfile_path, :string, :default => '/var/tmp/fluent-plugin-elb-access-log.ts' config_param :histfile_path, :string, :default => '/var/tmp/fluent-plugin-elb-access-log.history' config_param :interval, :time, :default => 300 config_param :start_datetime, :string, :default => nil config_param :buffer_sec, :time, :default => 600 config_param :history_length, :integer, :default => 100 config_param :sampling_interval, :integer, :default => 1 config_param :debug, :bool, :default => false def initialize super require 'csv' require 'fileutils' require 'logger' require 'time' require 'uri' require 'aws-sdk' end def configure(conf) super FileUtils.touch(@tsfile_path) FileUtils.touch(@histfile_path) if @start_datetime @start_datetime = Time.parse(@start_datetime).utc else @start_datetime = Time.parse(File.read(@tsfile_path)).utc rescue Time.now.utc end @history = load_history end def start super # Load client client @loop = Coolio::Loop.new timestamp = @start_datetime timer = TimerWatcher.new(@interval, true, log) do new_timestamp = fetch(timestamp) if new_timestamp > timestamp save_timestamp(new_timestamp) timestamp = new_timestamp end if @history.length > @history_length @history.shift(@history.length - @history_length) end save_history end @loop.attach(timer) @thread = Thread.new(&method(:run)) end def shutdown @loop.stop @thread.join end private def run @loop.run rescue => e log.error(e.message) log.error_backtrace(e.backtrace) end def fetch(timestamp) last_timestamp = timestamp prefixes(timestamp).each do |prefix| client.list_objects(:bucket => @s3_bucket, :prefix => prefix).each do |page| page.contents.each do |obj| account_id, logfile_const, region, elb_name, logfile_datetime, ip, logfile_suffix = obj.key.split('_', 7) logfile_datetime = Time.parse(logfile_datetime) if logfile_suffix !~ /\.log\z/ or logfile_datetime <= (timestamp - @buffer_sec) next end unless @history.include?(obj.key) access_log = client.get_object(bucket: @s3_bucket, key: obj.key).first.body.string emit_access_log(access_log) last_timestamp = logfile_datetime @history.push(obj.key) end end end end last_timestamp end def prefixes(timestamp) base_prefix = "AWSLogs/#{@account_id}/elasticloadbalancing/#{@region}/" base_prefix = "#{@s3_prefix}/#{base_prefix}" if @s3_prefix [timestamp - 86400, timestamp, timestamp + 86400].map do |date| base_prefix + date.strftime('%Y/%m/%d/') end end def emit_access_log(access_log) if @sampling_interval > 1 access_log = sampling(access_log) end access_log = CSV.parse(access_log, :col_sep => ' ') access_log.each do |row| record = Hash[ACCESS_LOG_FIELDS.keys.zip(row)] ACCESS_LOG_FIELDS.each do |name, conv| record[name] = record[name].send(conv) if conv end split_address_port!(record, 'client') split_address_port!(record, 'backend') parse_request!(record) time = Time.parse(record['timestamp']) router.emit(@tag, time.to_i, record) end end def sampling(access_log) access_log.split("\n").each_with_index.select {|row, i| (i % @sampling_interval).zero? }.map {|row, i| row }.join("\n") end def split_address_port!(record, prefix) address, port = record["#{prefix}_port"].split(':', 2) record[prefix] = address record["#{prefix}_port"] = port.to_i end def parse_request!(record) request = record['request'] method, uri, http_version = request.split(' ', 3) record['request.method'] = method record['request.uri'] = uri record['request.http_version'] = http_version begin uri = URI.parse(uri) [:scheme ,:userinfo, :host, :port, :registry, :path, :opaque, :query, :fragment].each do |key| record["request.uri.#{key}"] = uri.send(key) end rescue => e @log.warn(e.message) end end def save_timestamp(timestamp) open(@tsfile_path, 'w') do |tsfile| tsfile << timestamp.to_s end end def load_history File.read(@histfile_path).split("\n") end def save_history open(@histfile_path, 'w') do |histfile| histfile << @history.join("\n") end end def client return @client if @client options = {:user_agent_suffix => USER_AGENT_SUFFIX} options[:region] = @region if @region options[:http_proxy] = @http_proxy if @http_proxy if @aws_key_id and @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key elsif @profile credentials_opts = {:profile_name => @profile} credentials_opts[:path] = @credentials_path if @credentials_path credentials = Aws::SharedCredentials.new(credentials_opts) options[:credentials] = credentials end if @debug options[:logger] = Logger.new(log.out) options[:log_level] = :debug #options[:http_wire_trace] = true end @client = Aws::S3::Client.new(options) end class TimerWatcher < Coolio::TimerWatcher def initialize(interval, repeat, log, &callback) @callback = callback @log = log super(interval, repeat) end def on_timer @callback.call rescue => e @log.error(e.message) @log.error_backtrace(e.backtrace) end end # TimerWatcher end # Fluent::ElbAccessLogInput