require 'td/client/api_error'
require 'td/client/version'
require 'td/client/api/access_control'
require 'td/client/api/account'
require 'td/client/api/bulk_import'
require 'td/client/api/bulk_load'
require 'td/client/api/database'
require 'td/client/api/export'
require 'td/client/api/import'
require 'td/client/api/job'
require 'td/client/api/partial_delete'
require 'td/client/api/result'
require 'td/client/api/schedule'
require 'td/client/api/server_status'
require 'td/client/api/table'
require 'td/client/api/user'

# For disabling SSLv3 connection in favor of POODLE Attack protection
require 'td/core_ext/openssl/ssl/sslcontext/set_params'

module TreasureData

class API
  include API::AccessControl
  include API::Account
  include API::BulkImport
  include API::BulkLoad
  include API::Database
  include API::Export
  include API::Import
  include API::Job
  include API::PartialDelete
  include API::Result
  include API::Schedule
  include API::ServerStatus
  include API::Table
  include API::User

  DEFAULT_ENDPOINT = 'api.treasure-data.com'
  DEFAULT_IMPORT_ENDPOINT = 'api-import.treasure-data.com'

  NEW_DEFAULT_ENDPOINT = 'api.treasuredata.com'
  NEW_DEFAULT_IMPORT_ENDPOINT = 'api-import.treasuredata.com'

  class IncompleteError < APIError; end

  # @param [String] apikey
  # @param [Hash] opts
  def initialize(apikey, opts={})
    require 'json'
    require 'time'
    require 'uri'
    require 'net/http'
    require 'net/https'
    require 'time'
    #require 'faraday' # faraday doesn't support streaming upload with httpclient yet so now disabled
    require 'httpclient'
    require 'zlib'
    require 'stringio'
    require 'cgi'
    require 'msgpack'

    @apikey = apikey
    @user_agent = "TD-Client-Ruby: #{TreasureData::Client::VERSION}"
    @user_agent = "#{opts[:user_agent]}; " + @user_agent if opts.has_key?(:user_agent)

    endpoint = opts[:endpoint] || ENV['TD_API_SERVER'] || DEFAULT_ENDPOINT
    uri = URI.parse(endpoint)

    @connect_timeout = opts[:connect_timeout] || 60
    @read_timeout = opts[:read_timeout] || 600
    @send_timeout = opts[:send_timeout] || 600
    @retry_post_requests = opts[:retry_post_requests] || false
    @retry_delay = opts[:retry_delay] || 5
    @max_cumul_retry_delay = opts[:max_cumul_retry_delay] || 600

    case uri.scheme
    when 'http', 'https'
      @host = uri.host
      @port = uri.port
      # the opts[:ssl] option is ignored here, it's value
      #   overridden by the scheme of the endpoint URI
      @ssl = (uri.scheme == 'https')
      @base_path = uri.path.to_s

    else
      if uri.port
        # invalid URI
        raise "Invalid endpoint: #{endpoint}"
      end

      # generic URI
      @host, @port = endpoint.split(':', 2)
      @port = @port.to_i
      if opts[:ssl]
        @port = 443 if @port == 0
        @ssl = true
      else
        @port = 80 if @port == 0
        @ssl = false
      end
      @base_path = ''
    end

    @http_proxy = opts[:http_proxy] || ENV['HTTP_PROXY']
    @headers = opts[:headers] || {}
    @api = api_client("#{@ssl ? 'https' : 'http'}://#{@host}:#{@port}")
  end

  # TODO error check & raise appropriate errors

  # @!attribute [r] apikey
  attr_reader :apikey

  # @param [Hash] record
  # @param [IO] out
  def self.normalized_msgpack(record, out = nil)
    record.keys.each { |k|
      v = record[k]
      if v.kind_of?(Bignum)
        record[k] = v.to_s
      end
    }
    if out
      out << record.to_msgpack
      out
    else
      record.to_msgpack
    end
  end

  # @param [String] target
  # @param [Fixnum] min_len
  # @param [Fixnum] max_len
  # @param [String] name
  def self.validate_name(target, min_len, max_len, name)
    if !target.instance_of?(String) || target.empty?
      raise ParameterValidationError,
            "A valid target name is required"
    end

    name = name.to_s
    if name.empty?
      raise ParameterValidationError,
            "Empty #{target} name is not allowed"
    end
    if name.length < min_len || name.length > max_len
      raise ParameterValidationError,
            "#{target.capitalize} name must be between #{min_len} and #{max_len} characters long. Got #{name.length} " +
            (name.length == 1 ? "character" : "characters") + "."
    end
    unless name =~ /^([a-z0-9_]+)$/
      raise ParameterValidationError,
            "#{target.capitalize} name must only consist of lower-case alpha-numeric characters and '_'."
    end

    name
  end

  # @param [String] name
  def self.validate_database_name(name)
    validate_name("database", 3, 255, name)
  end

  # @param [String] name
  def self.validate_table_name(name)
    validate_name("table", 3, 255, name)
  end

  # @param [String] name
  def self.validate_result_set_name(name)
    validate_name("result set", 3, 255, name)
  end

  # @param [String] name
  def self.validate_column_name(name)
    validate_name("column", 1, 255, name)
  end

  # @param [String] name
  def self.normalize_database_name(name)
    name = name.to_s
    if name.empty?
      raise "Empty name is not allowed"
    end
    if name.length < 3
      name += "_" * (3 - name.length)
    end
    if 255 < name.length
      name = name[0, 253] + "__"
    end
    name = name.downcase
    name = name.gsub(/[^a-z0-9_]/, '_')
    name
  end

  # @param [String] name
  def self.normalize_table_name(name)
    normalize_database_name(name)
  end

  # TODO support array types
  # @param [String] name
  def self.normalize_type_name(name)
    case name
    when /int/i, /integer/i
      "int"
    when /long/i, /bigint/i
      "long"
    when /string/i
      "string"
    when /float/i
      "float"
    when /double/i
      "double"
    else
      raise "Type name must either of int, long, string float or double"
    end
  end

  # for fluent-plugin-td / td command to check table existence with import onlt user
  # @return [String]
  def self.create_empty_gz_data
    io = StringIO.new
    Zlib::GzipWriter.new(io).close
    io.string
  end

  # @param [String] ssl_ca_file
  def ssl_ca_file=(ssl_ca_file)
    @ssl_ca_file = ssl_ca_file
  end

private

  # @param [String] url
  # @param [Hash] params
  # @param [Hash] opt
  # @yield [response]
  def get(url, params=nil, opt={}, &block)
    guard_no_sslv3 do
      do_get(url, params, opt, &block)
    end
  end

  # @param [String] url
  # @param [Hash] params
  # @param [Hash] opt
  # @yield [response]
  def do_get(url, params=nil, opt={}, &block)
    client, header = new_client
    client.send_timeout = @send_timeout
    client.receive_timeout = @read_timeout

    header['Accept-Encoding'] = 'deflate, gzip'

    target = build_endpoint(url, @host)

    unless ENV['TD_CLIENT_DEBUG'].nil?
      puts "DEBUG: REST GET call:"
      puts "DEBUG:   header: " + header.to_s
      puts "DEBUG:   path:   " + target.to_s
      puts "DEBUG:   params: " + params.to_s
    end

    # up to 7 retries with exponential (base 2) back-off starting at 'retry_delay'
    retry_delay = @retry_delay
    cumul_retry_delay = 0

    # for both exceptions and 500+ errors retrying is enabled by default.
    # The total number of retries cumulatively should not exceed 10 minutes / 600 seconds
    response = nil
    etag = nil
    current_total_chunk_size = 0
    begin # this block is to allow retry (redo) in the begin part of the begin-rescue block
      begin
        if etag
          header['If-Range'] = etag
          header['Range'] = "bytes=#{current_total_chunk_size}-"
        else
          current_total_chunk_size = 0
        end
        if block
          response = client.get(target, params, header) {|res, chunk|
            current_total_chunk_size += chunk.bytesize
            block.call(res, chunk, current_total_chunk_size)
          }
        else
          response = client.get(target, params, header)
          current_total_chunk_size += response.body.bytesize
        end
        # XXX ext/openssl raises EOFError in case where underlying connection causes an error,
        #     but httpclient ignores it. Therefor, check content size.
        #     https://github.com/nahi/httpclient/issues/296
        validate_content_length!(response, current_total_chunk_size)

        status = response.code
        # retry if the HTTP error code is 500 or higher and we did not run out of retrying attempts
        if !block_given? && status >= 500 && cumul_retry_delay < @max_cumul_retry_delay
          $stderr.puts "Error #{status}: #{get_error(response)}. Retrying after #{retry_delay} seconds..."
          sleep retry_delay
          cumul_retry_delay += retry_delay
          retry_delay *= 2
          redo # restart from beginning of do-while loop
        end
      rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Timeout::Error, EOFError, OpenSSL::SSL::SSLError, SocketError, IncompleteError => e
        if opt[:resume]
          etag = response.header['ETag'].first
        elsif block_given?
          raise e
        end
        $stderr.print "#{e.class}: #{e.message}. "
        if cumul_retry_delay < @max_cumul_retry_delay
          $stderr.puts "Retrying after #{retry_delay} seconds..."
          sleep retry_delay
          cumul_retry_delay += retry_delay
          retry_delay *= 2
          retry
        else
          $stderr.puts "Retrying stopped after #{@max_cumul_retry_delay} seconds."
          raise e
        end
      rescue => e
        raise e
      end
    end while false

    unless ENV['TD_CLIENT_DEBUG'].nil?
      puts "DEBUG: REST GET response:"
      puts "DEBUG:   header: " + response.header.to_s
      puts "DEBUG:   status: " + response.code.to_s
      puts "DEBUG:   body:   " + response.body.to_s
    end

    body = block ? response.body : inflate_body(response)

    return [response.code.to_s, body, response]
  end

  def validate_content_length!(response, body_size)
    if content_length = response.header['Content-Range'].first
      content_length = content_length[/\d+$/]
    else
      content_length = response.header['Content-Length'].first
    end
    if content_length && content_length.to_i != body_size
      raise IncompleteError, "#{content_length} bytes expected, but got #{body_size} bytes"
    end
  end

  def inflate_body(response)
    return response.body if (ce = response.header['Content-Encoding']).empty?

    if ce.include?('gzip')
      infl = Zlib::Inflate.new(Zlib::MAX_WBITS + 16)
      begin
        infl.inflate(response.body)
      ensure
        infl.close
      end
    else
      # NOTE maybe for content-encoding is msgpack.gz ?
      Zlib::Inflate.inflate(response.body)
    end
  end

  # @param [String] url
  # @param [Hash] params
  def post(url, params=nil, &block)
    guard_no_sslv3 do
      do_post(url, params, &block)
    end
  end

  # @param [String] url
  # @param [Hash] params
  def do_post(url, params=nil, &block)
    target = build_endpoint(url, @host)

    client, header = new_client
    client.send_timeout       = @send_timeout
    client.receive_timeout    = @read_timeout
    header['Accept-Encoding'] = 'gzip'

    unless ENV['TD_CLIENT_DEBUG'].nil?
      puts "DEBUG: REST POST call:"
      puts "DEBUG:   header: " + header.to_s
      puts "DEBUG:   path:   " + target.to_s
      puts "DEBUG:   params: " + params.to_s
    end

    # up to 7 retries with exponential (base 2) back-off starting at 'retry_delay'
    retry_delay = @retry_delay
    cumul_retry_delay = 0

    # for both exceptions and 500+ errors retrying can be enabled by initialization
    # parameter 'retry_post_requests'. The total number of retries cumulatively
    # should not exceed 10 minutes / 600 seconds
    response = nil
    begin # this block is to allow retry (redo) in the begin part of the begin-rescue block
      begin
        response = client.post(target, params || {}, header)

        # if the HTTP error code is 500 or higher and the user requested retrying
        # on post request, attempt a retry
        status = response.code.to_i
        if @retry_post_requests && status >= 500 && cumul_retry_delay < @max_cumul_retry_delay
          $stderr.puts "Error #{status}: #{get_error(response)}. Retrying after #{retry_delay} seconds..."
          sleep retry_delay
          cumul_retry_delay += retry_delay
          retry_delay *= 2
          redo # restart from beginning of do-while loop
        end
      rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Timeout::Error, EOFError, OpenSSL::SSL::SSLError, SocketError => e
        $stderr.print "#{e.class}: #{e.message}. "
        if @retry_post_requests && cumul_retry_delay < @max_cumul_retry_delay
          $stderr.puts "Retrying after #{retry_delay} seconds..."
          sleep retry_delay
          cumul_retry_delay += retry_delay
          retry_delay *= 2
          retry
        else
          if @retry_post_requests
            $stderr.puts "Retrying stopped after #{@max_cumul_retry_delay} seconds."
          else
            $stderr.puts ""
          end
          raise e
        end
      rescue => e
        raise e
      end
    end while false

    body = block ? response.body : inflate_body(response)

    begin
      unless ENV['TD_CLIENT_DEBUG'].nil?
        puts "DEBUG: REST POST response:"
        puts "DEBUG:   header: " + response.header.to_s
        puts "DEBUG:   status: " + response.code.to_s
        puts "DEBUG:   body:   <omitted>"
      end
      return [response.code.to_s, body, response]
    ensure
      # Disconnect keep-alive connection explicitly here, not by GC.
      client.reset(target) rescue nil
    end
  end

  # @param [String] url
  # @param [String, StringIO] stream
  # @param [Fixnum] size
  # @param [Hash] opts
  def put(url, stream, size, opts = {})
    client, header = new_client(opts)
    client.send_timeout = @send_timeout
    client.receive_timeout = @read_timeout

    header['Content-Type'] = 'application/octet-stream'
    header['Content-Length'] = size.to_s

    body = if stream.class.name == 'StringIO'
             stream.string
           else
             stream
           end
    target = build_endpoint(url, opts[:host] || @host)

    unless ENV['TD_CLIENT_DEBUG'].nil?
      puts "DEBUG: REST PUT call:"
      puts "DEBUG:   header: " + header.to_s
      puts "DEBUG:   target: " + target.to_s
      puts "DEBUG:   body:   " + body.to_s
    end

    response = client.put(target, body, header)
    begin
      unless ENV['TD_CLIENT_DEBUG'].nil?
        puts "DEBUG: REST PUT response:"
        puts "DEBUG:   header: " + response.header.to_s
        puts "DEBUG:   status: " + response.code.to_s
        puts "DEBUG:   body:   <omitted>"
      end
      return [response.code.to_s, response.body, response]
    ensure
      # Disconnect keep-alive connection explicitly here, not by GC.
      client.reset(target) rescue nil
    end
  end

  # @param [String] url
  # @param [String] host
  # @return [String]
  def build_endpoint(url, host)
    schema = @ssl ? 'https' : 'http'
    "#{schema}://#{host}:#{@port}#{@base_path + url}"
  end

  # @yield Disable SSLv3 in given block
  def guard_no_sslv3
    key = :SET_SSL_OP_NO_SSLv3
    backup = Thread.current[key]
    begin
      # Disable SSLv3 connection: See Net::HTTP hack at the bottom
      Thread.current[key] = true
      yield
    ensure
      # backup could be nil, but assigning nil to TLS means 'delete'
      Thread.current[key] = backup
    end
  end

  # @param [Hash] opts
  # @return [HTTPClient, Hash]
  def new_client(opts = {})
    client = HTTPClient.new(@http_proxy, @user_agent)
    client.connect_timeout = @connect_timeout

    if @ssl
      client.ssl_config.add_trust_ca(ssl_ca_file)
      client.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_PEER
      # Disable SSLv3 connection in favor of POODLE Attack protection
      client.ssl_config.options |= OpenSSL::SSL::OP_NO_SSLv3
    end

    header = {}
    if @apikey
      header['Authorization'] = "TD1 #{apikey}"
    end
    header['Date'] = Time.now.rfc2822

    header.merge!(@headers)

    return client, header
  end

  # @return [String]
  def api_client(endpoint)
    header = {}.merge(@headers)
    header['Authorization'] = "TD1 #{apikey}" if @apikey
    header['Content-Type'] = 'application/json; charset=utf-8'
    client = HTTPClient.new(:proxy => @http_proxy, :agent_name => @user_agent, :base_url => endpoint, :default_header => header)
    client.connect_timeout = @connect_timeout
    client.send_timeout = @send_timeout
    client.receive_timeout = @read_timeout
    client.transparent_gzip_decompression = true
    client.debug_dev = STDOUT unless ENV['TD_CLIENT_DEBUG'].nil?
    client
  end

  def api(opt = {:retry_request => true}, &block)
    retry_request = opt[:retry_request]
    # up to 7 retries with exponential (base 2) back-off starting at 'retry_delay'
    retry_delay = @retry_delay
    retry_times = 0
    cumul_retry_delay = 0

    # for both exceptions and 500+ errors retrying can be enabled by initialization
    # parameter 'retry_post_requests'. The total number of retries cumulatively
    # should not exceed 10 minutes / 600 seconds
    begin # this block is to allow retry (redo) in the begin part of the begin-rescue block
      begin
        response = @api.instance_eval &block

        # if the HTTP error code is 500 or higher and the user requested retrying
        # on post request, attempt a retry
        status = response.code.to_i
        if retry_request && status >= 500 && cumul_retry_delay < @max_cumul_retry_delay
          $stderr.puts "Error #{status}: #{get_error(response)}. Retrying after #{retry_delay} seconds..."
          sleep retry_delay
          cumul_retry_delay += retry_delay
          retry_delay *= 2
          redo # restart from beginning of do-while loop
        end
        return response
      rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Timeout::Error, EOFError, OpenSSL::SSL::SSLError, SocketError => e
        $stderr.print "#{e.class}: #{e.message}. "
        if retry_request
          if cumul_retry_delay < @max_cumul_retry_delay
            $stderr.puts "Retrying after #{retry_delay} seconds..."
            sleep retry_delay
            cumul_retry_delay += retry_delay
            retry_delay *= 2
            retry_times += 1
            retry
          else
            $stderr.puts "Retrying stopped after #{@max_cumul_retry_delay} seconds."
            e.message << " (Retried #{retry_times} times in #{cumul_retry_delay} seconds)"
          end
        else
          $stderr.puts "No retry should be performed."
        end
        raise e
      end
    end while false
  end

  def ssl_ca_file
    @ssl_ca_file ||= File.join(File.dirname(__FILE__), '..', '..', '..', 'data', 'ca-bundle.crt')
  end

  # @param [response] res
  # @return [String]
  def get_error(res)
    parse_error_response(res)['message']
  end

  def parse_error_response(res)
    error = {}

    begin
      js = JSON.load(res.body)
      if js.nil?
        error['message'] = res.reason
      else
        error['message']    = js['message'] || js['error']
        error['stacktrace'] = js['stacktrace']
      end
    rescue JSON::ParserError
      error['message'] = res.body
    end

    error
  end

  # @param [String] msg
  # @param [response] res
  # @param [Class] klass
  def raise_error(msg, res, klass=nil)
    status_code = res.code.to_s
    error       = parse_error_response(res)
    message     = "#{msg}: #{error['message']}"

    error_class = if klass
      message = "#{status_code}: #{message}"
      klass
    else
      case status_code
      when "404"
        NotFoundError
      when "409"
        AlreadyExistsError
      when "401"
        AuthError
      when "403"
        ForbiddenError
      else
        message = "#{status_code}: #{message}"
        APIError
      end
    end

    if error_class.method_defined?(:api_backtrace)
      raise error_class.new(message, error['stacktrace'])
    else
      raise error_class, message
    end
  end

  if ''.respond_to?(:encode)
    # @param [String] s
    # @return [String]
    # * ' ' and '+' must be escaped into %20 and %2B because escaped text may be
    #   used as both URI and query.
    # * '.' must be escaped as %2E because it may be cunfused with extension.
    def e(s)
      s = s.to_s.encode(Encoding::UTF_8).force_encoding(Encoding::ASCII_8BIT)
      s.gsub!(/[^\-_!~*'()~0-9A-Z_a-z]/){|x|'%%%02X' % x.ord}
      s
    end
  else
    # @param [String] s
    # @return [String]
    # * ' ' and '+' must be escaped into %20 and %2B because escaped text may be
    #   used as both URI and query.
    # * '.' must be escaped as %2E because it may be cunfused with extension.
    def e(s)
      s.to_s.gsub(/[^\-_!~*'()~0-9A-Z_a-z]/){|x|'%%%02X' % x.ord}
    end
  end

  # @param [String] body
  # @param [Array] required
  def checked_json(body, required = [])
    js = nil
    begin
      js = JSON.load(body)
    rescue
      raise "Unexpected API response: #{$!}"
    end
    unless js.is_a?(Hash)
      raise "Unexpected API response: #{body}"
    end
    required.each {|k|
      unless js[k]
        raise "Unexpected API response: #{body}"
      end
    }
    js
  end
end

end # module TreasureData