require 'fluent/input'
require 'fluent_plugin_elb_access_log/version'

class Fluent::ElbAccessLogInput < Fluent::Input
  Fluent::Plugin.register_input('elb_access_log', self)

  USER_AGENT_SUFFIX = "fluent-plugin-elb-access-log/#{FluentPluginElbAccessLog::VERSION}"

  # http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html#access-log-entry-format
  ACCESS_LOG_FIELDS = {
    'timestamp'                => nil,
    'elb'                      => nil,
    'client_port'              => nil,
    'backend_port'             => nil,
    'request_processing_time'  => :to_f,
    'backend_processing_time'  => :to_f,
    'response_processing_time' => :to_f,
    'elb_status_code'          => :to_i,
    'backend_status_code'      => :to_i,
    'received_bytes'           => :to_i,
    'sent_bytes'               => :to_i,
    'request'                  => nil,
    'user_agent'               => nil,
    'ssl_cipher'               => nil,
    'ssl_protocol'             => nil,
  }

  unless method_defined?(:log)
    define_method('log') { $log }
  end

  unless method_defined?(:router)
    define_method('router') { Fluent::Engine }
  end

  config_param :aws_key_id,        :string,  :default => nil, :secret => true
  config_param :aws_sec_key,       :string,  :default => nil, :secret => true
  config_param :profile,           :string,  :default => nil
  config_param :credentials_path,  :string,  :default => nil
  config_param :http_proxy,        :string,  :default => nil
  config_param :account_id,        :string
  config_param :region,            :string
  config_param :s3_bucket,         :string
  config_param :s3_prefix,         :string,  :default => nil
  config_param :tag,               :string,  :default => 'elb.access_log'
  config_param :tsfile_path,       :string,  :default => '/var/tmp/fluent-plugin-elb-access-log.ts'
  config_param :histfile_path,     :string,  :default => '/var/tmp/fluent-plugin-elb-access-log.history'
  config_param :interval,          :time,    :default => 300
  config_param :start_datetime,    :string,  :default => nil
  config_param :buffer_sec,        :time,    :default => 600
  config_param :history_length,    :integer, :default => 100
  config_param :sampling_interval, :integer, :default => 1
  config_param :debug,             :bool,    :default => false

  def initialize
    super
    require 'csv'
    require 'fileutils'
    require 'logger'
    require 'time'
    require 'addressable/uri'
    require 'aws-sdk'
  end

  def configure(conf)
    super

    FileUtils.touch(@tsfile_path)
    FileUtils.touch(@histfile_path)
    tsfile_start_datetime = parse_tsfile

    if @start_datetime and not tsfile_start_datetime
      @start_datetime = Time.parse(@start_datetime).utc
    else
      if @start_datetime
        log.warn("start_datetime(#{@start_datetime}) is set. but tsfile datetime(#{tsfile_start_datetime}) is used")
      end

      @start_datetime = tsfile_start_datetime || Time.now.utc
    end

    @history = load_history
  end

  def start
    super

    # Load client
    client

    @loop = Coolio::Loop.new
    timestamp = @start_datetime

    timer = TimerWatcher.new(@interval, true, log) do
      new_timestamp = fetch(timestamp)

      if new_timestamp > timestamp
        save_timestamp(new_timestamp)
        timestamp = new_timestamp
      end

      if @history.length > @history_length
        @history.shift(@history.length - @history_length)
      end

      save_history
    end

    @loop.attach(timer)
    @thread = Thread.new(&method(:run))
  end

  def shutdown
    @loop.stop
    @thread.join
  end

  private

  def run
    @loop.run
  rescue => e
    log.error(e.message)
    log.error_backtrace(e.backtrace)
  end

  def fetch(timestamp)
    last_timestamp = timestamp

    prefixes(timestamp).each do |prefix|
      client.list_objects(:bucket => @s3_bucket, :prefix => prefix).each do |page|
        page.contents.each do |obj|
          account_id, logfile_const, region, elb_name, logfile_datetime, ip, logfile_suffix = obj.key.split('_', 7)
          logfile_datetime = Time.parse(logfile_datetime)

          if logfile_suffix !~ /\.log\z/ or logfile_datetime <= (timestamp - @buffer_sec)
            next
          end

          unless @history.include?(obj.key)
            access_log = client.get_object(bucket: @s3_bucket, key: obj.key).body.string
            emit_access_log(access_log)
            last_timestamp = logfile_datetime
            @history.push(obj.key)
          end
        end
      end
    end

    last_timestamp
  end

  def prefixes(timestamp)
    base_prefix = "AWSLogs/#{@account_id}/elasticloadbalancing/#{@region}/"
    base_prefix = "#{@s3_prefix}/#{base_prefix}" if @s3_prefix

    [timestamp - 86400, timestamp, timestamp + 86400].map do |date|
      base_prefix + date.strftime('%Y/%m/%d/')
    end
  end

  def emit_access_log(access_log)
    if @sampling_interval > 1
      access_log = sampling(access_log)
    end

    parsed_access_log = []

    normalize_line_feeds(access_log).split("\n").each do |line|
      line = parse_line(line)
      parsed_access_log << line if line
    end

    parsed_access_log.each do |row|
      record = Hash[ACCESS_LOG_FIELDS.keys.zip(row)]

      ACCESS_LOG_FIELDS.each do |name, conv|
        record[name] = record[name].send(conv) if conv
      end

      split_address_port!(record, 'client')
      split_address_port!(record, 'backend')

      parse_request!(record)

      time = Time.parse(record['timestamp']) rescue Time.now
      router.emit(@tag, time.to_i, record)
    end
  end

  def parse_line(line)
    parsed = nil

    begin
      parsed = CSV.parse_line(line, :col_sep => ' ')
    rescue => e
      begin
        parsed = line.split(' ', 12)

        # request
        parsed[11] ||= ''
        parsed[11].sub!(/\A"/, '')
        parsed[11].sub!(/"(.*)\z/, '')

        user_agent, ssl_cipher, ssl_protocol = $1.strip.split(' ', 3)
        user_agent.sub!(/\A"/, '').sub!(/"\z/, '') if user_agent
        parsed[12] = user_agent
        parsed[13] = ssl_cipher
        parsed[14] = ssl_protocol
      rescue => e2
        @log.warn("#{e.message}: #{line}")
      end
    end

    parsed
  end

  # This method is required because fields of user-agent are sometimes separated
  # to several lines like flollowing example,
  # 2017-06-07T22:47:14.827494Z baby 162.243.126.163:37036 10.6.49.1:80 0.000042 0.004133 0.00002 301 301 0 232 "GET http://example.com:80/ HTTP/1.0" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133
  #          Safari/537.36
  #" - -
  def normalize_line_feeds(str)
    in_quotation = false
    normalized_str = ''
    previous_ch = nil

    str.each_char do |current_ch|
      if in_quotation && current_ch == "\n"
        normalized_str << ' '
      else
        normalized_str << current_ch
      end

      if current_ch == '"' && previous_ch != '"'
        in_quotation = !in_quotation
      end
      previous_ch = current_ch
    end

    normalized_str
  end

  def sampling(access_log)
    access_log.split("\n").each_with_index.select {|row, i| (i % @sampling_interval).zero? }.map {|row, i| row }.join("\n")
  end

  def split_address_port!(record, prefix)
    address_port = record["#{prefix}_port"]
    return unless address_port
    address, port = address_port.split(':', 2)
    record[prefix] = address
    record["#{prefix}_port"] = port.to_i
  end

  def parse_request!(record)
    request = record['request']
    return unless request
    method, uri, http_version = request.split(' ', 3)

    record['request.method'] = method
    record['request.uri'] = uri
    record['request.http_version'] = http_version

    begin
      uri = Addressable::URI.parse(uri)

      if uri
        [:scheme ,:user, :host, :port, :path, :query, :fragment].each do |key|
          record["request.uri.#{key}"] = uri.send(key)
        end
      end
    rescue => e
      @log.warn("#{e.message}: #{uri}")
    end
  end

  def save_timestamp(timestamp)
    open(@tsfile_path, 'w') do |tsfile|
      tsfile << timestamp.to_s
    end
  end

  def load_history
    File.read(@histfile_path).split("\n")
  end

  def save_history
    open(@histfile_path, 'w') do |histfile|
      histfile << @history.join("\n")
    end
  end

  def parse_tsfile
    Time.parse(File.read(@tsfile_path)).utc
  rescue
    nil
  end

  def client
    return @client if @client

    options = {:user_agent_suffix => USER_AGENT_SUFFIX}
    options[:region] = @region if @region
    options[:http_proxy] = @http_proxy if @http_proxy

    if @aws_key_id and @aws_sec_key
      options[:access_key_id] = @aws_key_id
      options[:secret_access_key] = @aws_sec_key
    elsif @profile
      credentials_opts = {:profile_name => @profile}
      credentials_opts[:path] = @credentials_path if @credentials_path
      credentials = Aws::SharedCredentials.new(credentials_opts)
      options[:credentials] = credentials
    end

    if @debug
      options[:logger] = Logger.new(log.out)
      options[:log_level] = :debug
      #options[:http_wire_trace] = true
    end

    @client = Aws::S3::Client.new(options)
  end

  class TimerWatcher < Coolio::TimerWatcher
    def initialize(interval, repeat, log, &callback)
      @callback = callback
      @log = log
      super(interval, repeat)
    end

    def on_timer
      @callback.call
    rescue => e
      @log.error(e.message)
      @log.error_backtrace(e.backtrace)
    end
  end # TimerWatcher
end # Fluent::ElbAccessLogInput