require 'td/command/options'

module TreasureData
module Command
  include Options

  # TODO
  JOB_WAIT_MAX_RETRY_COUNT_ON_NETWORK_ERROR = 10

  PRIORITY_FORMAT_MAP = {
    -2 => 'VERY LOW',
    -1 => 'LOW',
    0 => 'NORMAL',
    1 => 'HIGH',
    2 => 'VERY HIGH',
  }

  PRIORITY_PARSE_MAP = {
    /\Avery[ _\-]?low\z/i => -2,
    /\A-2\z/ => -2,
    /\Alow\z/i => -1,
    /\A-1\z/ => -1,
    /\Anorm(?:al)?\z/i => 0,
    /\A[\-\+]?0\z/ => 0,
    /\Ahigh\z/i => 1,
    /\A[\+]?1\z/ => 1,
    /\Avery[ _\-]?high\z/i => 2,
    /\A[\+]?2\z/ => 2,
  }

  def job_list(op)
    page = 0
    skip = 0
    status = nil
    slower_than = nil

    op.on('-p', '--page PAGE', 'skip N pages', Integer) {|i|
      page = i
    }
    op.on('-s', '--skip N', 'skip N jobs', Integer) {|i|
      skip = i
    }
    op.on('-R', '--running', 'show only running jobs', TrueClass) {|b|
      status = 'running'
    }
    op.on('-S', '--success', 'show only succeeded jobs', TrueClass) {|b|
      status = 'success'
    }
    op.on('-E', '--error', 'show only failed jobs', TrueClass) {|b|
      status = 'error'
    }
    op.on('--slow [SECONDS]', 'show slow queries (default threshold: 3600 seconds)', Integer) {|i|
      slower_than = i || 3600
    }

    set_render_format_option(op)

    max = op.cmd_parse

    max = (max || 20).to_i

    client = get_client

    if page
      skip += max * page
    end

    conditions = nil
    if slower_than
      conditions = {:slower_than => slower_than}
    end

    jobs = client.jobs(skip, skip + max - 1, status, conditions)

    rows = []
    jobs.each {|job|
      job.auto_update_status = false
      start = job.start_at
      elapsed = Command.humanize_elapsed_time(start, job.end_at)
      cpu_time = Command.humanize_time(job.cpu_time, true)
      priority = job_priority_name_of(job.priority)
      query = (op.render_format == 'table' || op.render_format.nil? ? job.query.to_s[0,50] + " ..." : job.query)
      rows << {
        :JobID => job.job_id,
        :Database => job.db_name,
        :Status => job.status,
        :Type => job.type,
        :Query => query,
        :Start => (start ? start.localtime : ''),
        :Elapsed => elapsed.rjust(11),
        :CPUTime => cpu_time.rjust(17),
        :ResultSize => (job.result_size ? Command.humanize_bytesize(job.result_size, 2) : ""),
        :Priority => priority,
        :Result => job.result_url,
        :Duration => job.duration ? Time.at(job.duration).utc.strftime("%X") : nil
      }
    }

    $stdout.puts cmd_render_table(rows,
      :fields => [:JobID, :Status, :Start, :Elapsed, :CPUTime, :ResultSize, :Priority, :Result, :Type, :Database, :Query, :Duration],
      :max_width => 1000,
      :render_format => op.render_format
    )
  end

  def job_show(op)
    options = job_show_options(op)
    job_id = op.cmd_parse

    verbose     = options[:verbose]
    wait        = options[:wait]
    output      = options[:output]
    format      = options[:format]
    render_opts = options[:render_opts]
    limit       = options[:limit]
    exclude     = options[:exclude]

    if output.nil? && format
      unless ['tsv', 'csv', 'json'].include?(format)
        raise ParameterConfigurationError,
              "Supported formats are only tsv, csv and json without -o / --output option"
      end
    end

    if render_opts[:header]
      unless ['json', 'tsv', 'csv'].include?(format)
        raise ParameterConfigurationError,
              "Option -c / --column-header is only supported with json, tsv and csv formats"
      end
    end

    if !output.nil? && !limit.nil?
      raise ParameterConfigurationError,
            "Option -l / --limit is only valid when not outputting to file (no -o / --output option provided)"
    end

    get_and_show_result(job_id, wait, exclude, output, limit, format, render_opts, verbose)
  end

  def job_status(op)
    job_id = op.cmd_parse
    client = get_client

    $stdout.puts client.job_status(job_id)
  end

  def job_kill(op)
    job_id = op.cmd_parse

    client = get_client

    former_status = client.kill(job_id)
    if TreasureData::Job::FINISHED_STATUS.include?(former_status)
      $stderr.puts "Job #{job_id} is already finished (#{former_status})"
      exit 0
    end

    if former_status == TreasureData::Job::STATUS_RUNNING
      $stderr.puts "Job #{job_id} is killed."
    else
      $stderr.puts "Job #{job_id} is canceled."
    end
  end

private

  def get_and_show_result(job_id, wait, exclude = false, output = nil, limit = nil, format = nil, render_opts = {}, verbose = false)
    client = get_client
    job = client.job(job_id)

    $stdout.puts "JobID       : #{job.job_id}"
    #puts "URL         : #{job.url}"
    $stdout.puts "Status      : #{job.status}"
    $stdout.puts "Type        : #{job.type}"
    $stdout.puts "Database    : #{job.db_name}"
    # exclude some fields from bulk_import_perform type jobs
    if [:hive, :pig, :impala, :presto].include?(job.type)
      $stdout.puts "Priority    : #{job_priority_name_of(job.priority)}"
      $stdout.puts "Retry limit : #{job.retry_limit}"
      $stdout.puts "Output      : #{job.result_url}"
      $stdout.puts "Query       : #{job.query}"
    elsif job.type == :bulk_import_perform
      $stdout.puts "Destination : #{job.query}"
    elsif job.type == :bulkload
      require 'yaml'
      $stdout.puts "Config      :\n#{YAML.dump(job.query)}"
    end
    # if the job is done and is of type hive, show the Map-Reduce cumulated CPU time
    if job.finished?
      if [:hive].include?(job.type)
        $stdout.puts "CPU time    : #{Command.humanize_time(job.cpu_time, true)}"
      end
      if [:hive, :pig, :impala, :presto].include?(job.type)
        $stdout.puts "Result size : #{Command.humanize_bytesize(job.result_size, 2)}"
      end
    end

    if wait && !job.finished?
      wait_job(job)
      if [:hive, :pig, :impala, :presto].include?(job.type) && !exclude
        show_result_with_retry(job, output, limit, format, render_opts)
      end
    else
      if [:hive, :pig, :impala, :presto].include?(job.type) && !exclude && job.finished?
        show_result_with_retry(job, output, limit, format, render_opts)
      end

      if verbose
        if !job.debug['cmdout'].nil?
          $stdout.puts ""
          $stdout.puts "Output:"
          job.debug['cmdout'].to_s.split("\n").each {|line|
            $stdout.puts "  " + line
          }
        end
        if !job.debug['stderr'].nil?
          $stdout.puts ""
          $stdout.puts "Details:"
          job.debug['stderr'].to_s.split("\n").each {|line|
            $stdout.puts "  " + line
          }
        end
      end
    end

    $stdout.puts "\rUse '-v' option to show detailed messages." + " " * 20 unless verbose
  end

  def wait_job(job, first_call = false, wait = nil)
    $stderr.puts "queued..."

    cmdout_lines = 0
    stderr_lines = 0
    max_error_counts = JOB_WAIT_MAX_RETRY_COUNT_ON_NETWORK_ERROR

    job.wait(wait, detail: true, verbose: true) do
      cmdout = job.debug['cmdout'].to_s.split("\n")[cmdout_lines..-1] || []
      stderr = job.debug['stderr'].to_s.split("\n")[stderr_lines..-1] || []
      (cmdout + stderr).each {|line|
        $stdout.puts "  "+line
      }
      cmdout_lines += cmdout.size
      stderr_lines += stderr.size
    end
  end

  def show_result_with_retry(job, output, limit, format, render_opts)
    # up to 7 retries with exponential (base 2) back-off starting at 'retry_delay'
    retry_delay = 5
    max_cumul_retry_delay = 200
    cumul_retry_delay = 0

    $stdout.puts "Result      :"
    begin
      show_result(job, output, limit, format, render_opts)
    rescue TreasureData::NotFoundError => e
      # Got 404 because result not found.
    rescue TreasureData::APIError, # HTTP status code 500 or more
            Errno::ECONNREFUSED, Errno::ECONNRESET, Timeout::Error, EOFError,
            OpenSSL::SSL::SSLError, SocketError => e
      # don't retry on 300 and 400 errors
      if e.class == TreasureData::APIError && e.message !~ /^5\d\d:\s+/
        raise e
      end
      if cumul_retry_delay > max_cumul_retry_delay
        raise e
      end
      $stderr.puts "Error #{e.class}: #{e.message}. Retrying after #{retry_delay} seconds..."
      sleep retry_delay
      cumul_retry_delay += retry_delay
      retry_delay *= 2
      retry
    end
  end

  def show_result(job, output, limit, format, render_opts={})
    if output
      write_result(job, output, limit, format, render_opts)
      $stdout.puts "\rwritten to #{output} in #{format} format" + " " * 50
    else
      # every format that is allowed on stdout
      render_result(job, limit, format, render_opts)
    end
  end

  def write_result(job, output, limit, format, render_opts={})

    # the next 3 formats allow writing to both a file and stdout


    if output
      if output.is_a?(String)
        tempfile = "#{output}.tmp"
      else # File or Tempfile
        tempfile = "#{output.path}.tmp"
      end
    end

    case format
    when 'json'
      if render_opts[:header] && job.hive_result_schema
        headers = job.hive_result_schema.map {|name, _| name }

        write_result_for_json(job, output, tempfile, limit, render_opts) {|row|
          Hash[headers.zip(row)]
        }
      else
        write_result_for_json(job, output, tempfile, limit, render_opts) {|row| row }
      end
    when 'csv'
      require 'yajl'
      require 'csv'

      open_file(tempfile || output, "w") {|f|
        writer = CSV.new(f)
        # output headers
        if render_opts[:header] && job.hive_result_schema
          writer << job.hive_result_schema.map {|name, type|
            name
          }
        end
        # output data
        n_rows = 0
        unless output.nil?
          indicator = Command::SizeBasedDownloadProgressIndicator.new(
            "NOTE: the job result is being written to #{output} in csv format",
            job.result_size, 0.1, 1)
        end
        job.result_each_with_compr_size {|row, compr_size|
          # TODO limit the # of columns
          writer << row.map {|col|
            dump_column(col, render_opts[:null_expr])
          }
          n_rows += 1
          if n_rows % 100 == 0 # flush every 100 recods
            writer.flush
            indicator.update(compr_size) unless output.nil?
          end
          break if output.nil? and !limit.nil? and n_rows == limit
        }
        indicator.finish unless output.nil?
      }

    when 'tsv'
      require 'yajl'

      open_file(tempfile || output, "w") {|f|
        # output headers
        if render_opts[:header] && job.hive_result_schema
          f.write job.hive_result_schema.map {|name, type| name}.join("\t") + "\n"
        end
        # output data
        n_rows = 0
        unless output.nil?
          indicator = Command::SizeBasedDownloadProgressIndicator.new(
            "NOTE: the job result is being written to #{output} in tsv format",
            job.result_size, 0.1, 1)
        end

        job.result_each_with_compr_size {|row, compr_size|
          f.write row.map {|col| dump_column(col, render_opts[:null_expr])}.join("\t") + "\n"
          n_rows += 1
          if n_rows % 100 == 0
            f.flush # flush every 100 recods
            indicator.update(compr_size) unless output.nil?
          end
          break if output.nil? and !limit.nil? and n_rows == limit
        }
        indicator.finish unless output.nil?
      }

    # these last 2 formats are only valid if writing the result to file through the -o/--output option.

    when 'msgpack'
      if output.nil?
        raise ParameterConfigurationError,
              "Format 'msgpack' does not support writing to stdout"
      end
      open_file(tempfile || output, "wb") {|f|
        indicator = Command::SizeBasedDownloadProgressIndicator.new(
          "NOTE: the job result is being written to #{output} in msgpack format",
          job.result_size, 0.1, 1)
        job.result_format('msgpack', f) {|compr_size|
          indicator.update(compr_size)
        }
        indicator.finish
      }

    when 'msgpack.gz'
      if output.nil?
        raise ParameterConfigurationError,
              "Format 'msgpack' does not support writing to stdout"
      end
      open_file(tempfile || output, "wb") {|f|
        indicator = Command::SizeBasedDownloadProgressIndicator.new(
          "NOTE: the job result is being written to #{output} in msgpack.gz format",
          job.result_size, 0.1, 1)
        job.result_raw('msgpack.gz', f) {|compr_size|
          indicator.update(compr_size)
        }
        indicator.finish
      }

    else
      raise "Unknown format #{format.inspect}"
    end

    if tempfile && File.exists?(tempfile)
      FileUtils.mv(tempfile, output.respond_to?(:path) ? output.path : output)
    end
  end

  def open_file(output, mode)
    f = nil
    if output.nil?
      yield STDOUT
    else
      f = File.open(output, mode)
      yield f
    end
  ensure
    if f
      f.close unless f.closed?
    end
  end

  def write_result_for_json(job, output, tempfile, limit, render_opts)
    require 'yajl'
    open_file(tempfile || output, "w") {|f|
      f.write "["
      n_rows = 0
      unless output.nil?
        indicator = Command::SizeBasedDownloadProgressIndicator.new(
          "NOTE: the job result is being written to #{output} in json format",
          job.result_size, 0.1, 1)
      end
      job.result_each_with_compr_size {|row, compr_size|
        indicator.update(compr_size) unless output.nil?
        f.write ",\n" if n_rows > 0
        f.write Yajl.dump(yield(row))
        n_rows += 1
        break if output.nil? and !limit.nil? and n_rows == limit
      }
      f.write "]"
      indicator.finish unless output.nil?
    }
    $stdout.puts if output.nil?
  end

  def render_result(job, limit, format=nil, render_opts={})
    require 'yajl'

    if format.nil?
      # display result in tabular format
      rows = []
      n_rows = 0

      indicator = Command::SizeBasedDownloadProgressIndicator.new(
        "WARNING: the job result is being downloaded...", job.result_size, 0.1, 1)
      job.result_each_with_compr_size {|row, compr_size|
        indicator.update(compr_size)
        rows << row.map {|v|
          dump_column_safe_utf8(v, render_opts[:null_expr])
        }
        n_rows += 1
        break if !limit.nil? and n_rows == limit
      }
      $stdout.print " " * 100, "\r" # make sure the previous WARNING is cleared over

      render_opts[:max_width] = 10000
      if job.hive_result_schema
        render_opts[:change_fields] = job.hive_result_schema.map { |name,type| name }
      end

      $stdout.print "\r" + " " * 50
      $stdout.puts "\r" + cmd_render_table(rows, render_opts)
    else
      # display result in any of: json, csv, tsv.
      # msgpack and mspgpack.gz are not supported for stdout output
      write_result(job, nil, limit, format, render_opts)
    end
  end

  def dump_column(v, null_expr = nil)
    v = null_expr if v.nil? && null_expr

    s = v.is_a?(String) ? v.to_s : Yajl.dump(sanitize_infinite_value(v))
    # CAUTION: msgpack-ruby populates byte sequences as Encoding.default_internal which should be BINARY
    s = s.force_encoding('BINARY') if s.respond_to?(:encode)
    s
  end

  def dump_column_safe_utf8(v, null_expr = false)
    s = dump_column(v, null_expr)
    # Here does UTF-8 -> UTF-16LE -> UTF8 conversion:
    #   a) to make sure the string doesn't include invalid byte sequence
    #   b) to display multi-byte characters as it is
    #   c) encoding from UTF-8 to UTF-8 doesn't check/replace invalid chars
    #   d) UTF-16LE was slightly faster than UTF-16BE, UTF-32LE or UTF-32BE
    s = s.encode('UTF-16LE', 'UTF-8', :invalid=>:replace, :undef=>:replace).encode!('UTF-8') if s.respond_to?(:encode)
    s
  end

  def sanitize_infinite_value(v)
    case v
    when Float
      v.finite? ? v : v.to_s
    when Hash, Array
      Marshal.load(Marshal.dump(v), ->(x){(x.is_a?(Float) && !x.finite?) ? x.to_s : x})
    else
      v
    end
  end

  def job_priority_name_of(id)
    PRIORITY_FORMAT_MAP[id] || 'NORMAL'
  end

  def job_priority_id_of(name)
    PRIORITY_PARSE_MAP.each_pair {|pattern,id|
      return id if pattern.match(name)
    }
    return nil
  end

end # module Command
end # module TrasureData