require 'csv'
require 'json'
require 'zip'

module Fluent
  class S3InputOutput < Output

    Fluent::Plugin.register_output('s3_input', self)

    # Define `router` method of v0.12 to support v0.10 or earlier
    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    config_param :aws_key_id, :string, :default => ENV['AWS_ACCESS_KEY_ID'], :secret => true
    config_param :aws_sec_key, :string, :default => ENV['AWS_SECRET_ACCESS_KEY'], :secret => true
    config_param :aws_region, :string, :default => "us-east-1"
    config_param :s3_bucket_key
    config_param :s3_object_key_key
    config_param :tag
    config_param :merge_record, :bool, :default => false
    config_param :record_key, :string, :default => nil
    config_param :remove_keys, :array, :default => []
    config_param :time_keys, :array, :default => []
    config_param :time_format, :string, :default => "%Y-%m-%dT%H:%M:%S"
    config_param :gzip_exts, :array, :default => []
    config_param :zip_exts, :array, :default => []
    config_param :format, :string, :default => 'json'

    attr_accessor :s3

    def initialize
      super
      require 'net/http'
      require 'oj'
      require 'aws-sdk'
    end

    def configure(conf)
      super

      if @aws_key_id and @aws_sec_key
        @s3 = Aws::S3::Client.new(
          region: @aws_region,
          access_key_id: @aws_key_id,
          secret_access_key: @aws_sec_key,
        )
      else
        @s3 = Aws::S3::Client.new(
          region: @aws_region,
        )
      end
    end
    
    # Allow JSON data in a couple of formats
    # {} single event
    # [{},{}] array of events
    # {}\n{}\n{} concatenated events (flume)
    def normalize_json(json)
      if json[0] != "["
        json=json.gsub /}\n{/,"},{" 
        json="[#{json}]"
      end
      json
    end

    def emit(tag, es, chain)
      begin
        tag_parts = tag.split('.')
        es.each { |time, record|
          s3_bucket = record[s3_bucket_key]
          s3_key = record[s3_object_key_key]
          s3_key_ext = s3_key.split(".")[-1]
          resp = s3.get_object(bucket: s3_bucket, key: s3_key) 

          if @gzip_exts.include?(s3_key_ext)
            input = Zlib::GzipReader.new(resp.body)
          elsif @zip_exts.include?(s3_key_ext)
            io = Zip::InputStream.new(resp.body)
            input = io.get_next_entry
            #input = Zip::File.open(resp.body).entries.first.get_input_stream
          else
            input = resp.body
          end

          new_record = {}
          if @merge_record
            new_record = {}.merge(record)
          end
   
          s3_record = {} 
          if @format == 'json'
            json_data=normalize_json input.read
            begin
              s3_record = Oj.load(json_data)
            rescue Oj::ParseError=>e
              puts "Failure parsing: "
              puts json_data.to_s
              puts "Error: #{e.to_s}"
            end
          elsif @format == 'csv'
            data = input.read
            File.open("/tmp/s3debug", 'w') { |file| file.write(data) } 
            s3_record=CSV.parse(data).to_json
          else
            raise "Unsupported format - #{@format}"
          end

          # parse the time from the record
          @time_keys.each do |time_key|
            puts "Look for #{time_key} in #{new_record}"
            if s3_record.include? time_key
              puts "Reset time for #{time_key}"
              time=Time.strptime(new_record[time_key], @time_format).to_i
              puts "Setting time to #{time}"
              break
            end
          end

          if @record_key == nil  
            tmp_record=s3_record.merge(new_record)
            new_record=tmp_record
          else
            new_record[record_key]=s3_record
          end
        
          @remove_keys.each do |key_to_remove|
            new_record.delete(key_to_remove)
          end

          router.emit(@tag, time, new_record)
        }
        chain.next
      rescue StandardError => e
        $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}"
      end
    end
  end
end