class TreasureData::API
module Job

  ####
  ## Job API
  ##

  # @param [Fixnum] from
  # @param [Fixnum] to
  # @param [String] status
  # @param [Hash] conditions
  # @return [Array]
  def list_jobs(from=0, to=nil, status=nil, conditions=nil)
    params = {}
    params['from'] = from.to_s if from
    params['to'] = to.to_s if to
    params['status'] = status.to_s if status
    params.merge!(conditions) if conditions
    code, body, res = get("/v3/job/list", params)
    if code != "200"
      raise_error("List jobs failed", res)
    end
    js = checked_json(body, %w[jobs])
    result = []
    js['jobs'].each {|m|
      job_id = m['job_id']
      type = (m['type'] || '?').to_sym
      database = m['database']
      status = m['status']
      query = m['query']
      start_at = m['start_at']
      end_at = m['end_at']
      cpu_time = m['cpu_time']
      result_size = m['result_size'] # compressed result size in msgpack.gz format
      result_url = m['result']
      priority = m['priority']
      retry_limit = m['retry_limit']
      duration = m['duration']
      result << [job_id, type, status, query, start_at, end_at, cpu_time,
                 result_size, result_url, priority, retry_limit, nil, database,
                 duration]
    }
    return result
  end

  # @param [String] job_id
  # @return [Array]
  def show_job(job_id)
    # use v3/job/status instead of v3/job/show to poll finish of a job
    code, body, res = get("/v3/job/show/#{e job_id}")
    if code != "200"
      raise_error("Show job failed", res)
    end
    js = checked_json(body, %w[status])
    # TODO debug
    type = (js['type'] || '?').to_sym  # TODO
    database = js['database']
    query = js['query']
    status = js['status']
    debug = js['debug']
    url = js['url']
    start_at = js['start_at']
    end_at = js['end_at']
    cpu_time = js['cpu_time']
    result_size = js['result_size'] # compressed result size in msgpack.gz format
    result = js['result'] # result target URL
    hive_result_schema = (js['hive_result_schema'] || '')
    if hive_result_schema.empty?
      hive_result_schema = nil
    else
      begin
        hive_result_schema = JSON.parse(hive_result_schema)
      rescue JSON::ParserError => e
        # this is a workaround for a Known Limitation in the Pig Engine which does not set a default, auto-generated
        #   column name for anonymous columns (such as the ones that are generated from UDF like COUNT or SUM).
        # The schema will contain 'nil' for the name of those columns and that breaks the JSON parser since it violates
        #   the JSON syntax standard.
        if type == :pig and hive_result_schema !~ /[\{\}]/
          begin
            # NOTE: this works because a JSON 2 dimensional array is the same as a Ruby one.
            #   Any change in the format for the hive_result_schema output may cause a syntax error, in which case
            #   this lame attempt at fixing the problem will fail and we will be raising the original JSON exception
            hive_result_schema = eval(hive_result_schema)
          rescue SyntaxError => ignored_e
            raise e
          end
          hive_result_schema.each_with_index {|col_schema, idx|
            if col_schema[0].nil?
              col_schema[0] = "_col#{idx}"
            end
          }
        else
          raise e
        end
      end
    end
    priority = js['priority']
    retry_limit = js['retry_limit']
    return [type, query, status, url, debug, start_at, end_at, cpu_time,
            result_size, result, hive_result_schema, priority, retry_limit, nil, database]
  end

  # @param [String] job_id
  # @return [String] HTTP status
  def job_status(job_id)
    code, body, res = get("/v3/job/status/#{e job_id}")
    if code != "200"
      raise_error("Get job status failed", res)
    end

    js = checked_json(body, %w[status])
    return js['status']
  end

  # @param [String] job_id
  # @return [Array]
  def job_result(job_id)
    code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>'msgpack'}, {:resume => true})
    if code != "200"
      raise_error("Get job result failed", res)
    end
    result = []
    MessagePack::Unpacker.new.feed_each(body) {|row|
      result << row
    }
    return result
  end

  # block is optional and must accept 1 parameter
  #
  # @param [String] job_id
  # @param [String] format
  # @param [IO] io
  # @param [Proc] block
  # @return [nil, String]
  def job_result_format(job_id, format, io=nil, &block)
    if io
      infl = nil
      code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>format}, {:resume => true}) {|res, chunk, current_total_chunk_size|
        if res.code != 200
          raise_error("Get job result failed", res)
        end

        infl ||= create_inflalte_or_null_inflate(res)

        io.write infl.inflate(chunk)
        block.call(current_total_chunk_size) if block_given?
      }
      nil
    else
      code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>format}, {:resume => true})
      if code != "200"
        raise_error("Get job result failed", res)
      end
      body
    end
  end

  # block is optional and must accept 1 argument
  #
  # @param [String] job_id
  # @param [Proc] block
  # @return [nil]
  def job_result_each(job_id, &block)
    upkr = MessagePack::Unpacker.new
    infl = nil

    get("/v3/job/result/#{e job_id}", {'format'=>'msgpack'}, {:resume => true}) {|res, chunk, current_total_chunk_size|
      if res.code != 200
        raise_error("Get job result failed", res)
      end

      # default to decompressing the response since format is fixed to 'msgpack'
      infl ||= create_inflate(res)

      inflated_fragment = infl.inflate(chunk)
      upkr.feed_each(inflated_fragment, &block)
    }
    nil
  ensure
    infl.close if infl
  end

  # block is optional and must accept 1 argument
  #
  # @param [String] job_id
  # @param [Proc] block
  # @return [nil]
  def job_result_each_with_compr_size(job_id, &block)
    upkr = MessagePack::Unpacker.new
    infl = nil

    get("/v3/job/result/#{e job_id}", {'format'=>'msgpack'}, {:resume => true}) {|res, chunk, current_total_chunk_size|
      if res.code != 200
        raise_error("Get job result failed", res)
      end

      # default to decompressing the response since format is fixed to 'msgpack'
      infl ||= create_inflate(res)

      inflated_fragment = infl.inflate(chunk)
      upkr.feed_each(inflated_fragment) {|unpacked|
        block.call(unpacked, current_total_chunk_size) if block_given?
      }
    }
    nil
  ensure
    infl.close if infl
  end

  # @param [String] job_id
  # @param [String] format
  # @return [String]
  def job_result_raw(job_id, format, io = nil, &block)
    body = nil

    get("/v3/job/result/#{e job_id}", {'format'=>format}, {:resume => true}) {|res, chunk, current_total_chunk_size|
      unless res.ok?
        raise_error("Get job result failed", res)
      end

      if io
        io.write(chunk)
        block.call(current_total_chunk_size) if block_given?
      else
        if body
          body += chunk
        else
          body = chunk
        end
      end
    }
    body
  end

  # @param [String] job_id
  # @return [String]
  def kill(job_id)
    code, body, res = post("/v3/job/kill/#{e job_id}")
    if code != "200"
      raise_error("Kill job failed", res)
    end
    js = checked_json(body, %w[])
    former_status = js['former_status']
    return former_status
  end

  # @param [String] q
  # @param [String] db
  # @param [String] result_url
  # @param [Fixnum] priority
  # @param [Hash] opts
  # @return [String] job_id
  def hive_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
    query(q, :hive, db, result_url, priority, retry_limit, opts)
  end

  # @param [String] q
  # @param [String] db
  # @param [String] result_url
  # @param [Fixnum] priority
  # @param [Hash] opts
  # @return [String] job_id
  def pig_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
    query(q, :pig, db, result_url, priority, retry_limit, opts)
  end

  # @param [String] q
  # @param [Symbol] type
  # @param [String] db
  # @param [String] result_url
  # @param [Fixnum] priority
  # @param [Hash] opts
  # @return [String] job_id
  def query(q, type=:hive, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
    params = {'query' => q}.merge(opts)
    params['result'] = result_url if result_url
    params['priority'] = priority if priority
    params['retry_limit'] = retry_limit if retry_limit
    code, body, res = post("/v3/job/issue/#{type}/#{e db}", params)
    if code != "200"
      raise_error("Query failed", res)
    end
    js = checked_json(body, %w[job_id])
    return js['job_id'].to_s
  end

  private

  class NullInflate
    def inflate(chunk)
      chunk
    end

    def close
    end
  end

  def create_inflalte_or_null_inflate(response)
    if response.header['Content-Encoding'].empty?
      NullInflate.new
    else
      create_inflate(response)
    end
  end

  def create_inflate(response)
    if response.header['Content-Encoding'].include?('gzip')
      Zlib::Inflate.new(Zlib::MAX_WBITS + 16)
    else
      Zlib::Inflate.new
    end
  end
end
end