module Fluent
  module BigQuery
    class Writer
      def initialize(log, auth_method, options = {})
        @auth_method = auth_method
        @scope = "https://www.googleapis.com/auth/bigquery"
        @options = options
        @log = log
        @num_errors_per_chunk = {}
      end

      def client
        @client ||= Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl|
          cl.authorization = get_auth
          cl.client_options.open_timeout_sec = @options[:open_timeout_sec] if @options[:open_timeout_sec]
          cl.client_options.read_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec]
          cl.client_options.send_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec]
        end
      end

      def create_table(project, dataset, table_id, record_schema)
        create_table_retry_limit = 3
        create_table_retry_wait = 1
        create_table_retry_count = 0
        table_id = safe_table_id(table_id)

        begin
          definition = {
            table_reference: {
              table_id: table_id,
            },
            schema: {
              fields: record_schema.to_a,
            }
          }

          definition.merge!(time_partitioning: time_partitioning) if time_partitioning
          definition.merge!(clustering: clustering) if clustering
          client.insert_table(project, dataset, definition, {})
          log.debug "create table", project_id: project, dataset: dataset, table: table_id
        rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
          message = e.message
          if e.status_code == 409 && /Already Exists:/ =~ message
            log.debug "already created table", project_id: project, dataset: dataset, table: table_id
            # ignore 'Already Exists' error
            return
          end

          log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message

          if create_table_retry_count < create_table_retry_limit
            sleep create_table_retry_wait
            create_table_retry_wait *= 2
            create_table_retry_count += 1
            retry
          else
            raise Fluent::BigQuery::UnRetryableError.new("failed to create table in bigquery", e)
          end
        end
      end

      def fetch_schema(project, dataset, table_id)
        res = client.get_table(project, dataset, table_id)
        schema = Fluent::BigQuery::Helper.deep_stringify_keys(res.schema.to_h[:fields])
        log.debug "Load schema from BigQuery: #{project}:#{dataset}.#{table_id} #{schema}"

        schema
      rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
        message = e.message
        log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
        nil
      end

      def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
        body = {
          rows: rows,
          skip_invalid_rows: @options[:skip_invalid_rows],
          ignore_unknown_values: @options[:ignore_unknown_values],
        }
        body.merge!(template_suffix: template_suffix) if template_suffix

        if @options[:auto_create_table]
          res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
        else
          res = client.insert_all_table_data(project, dataset, table_id, body, {})
        end
        log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size

        if res.insert_errors && !res.insert_errors.empty?
          log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
          if @options[:allow_retry_insert_errors]
            is_included_any_retryable_insert_error = res.insert_errors.any? do |insert_error|
              insert_error.errors.any? { |error| Fluent::BigQuery::Error.retryable_insert_errors_reason?(error.reason) }
            end
            if is_included_any_retryable_insert_error
              raise Fluent::BigQuery::RetryableError.new("failed to insert into bigquery(insert errors), retry")
            else
              raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry")
            end
          end
        end
      rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
        error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message }
        wrapped = Fluent::BigQuery::Error.wrap(e)
        if wrapped.retryable?
          log.warn "tabledata.insertAll API", error_data
        else
          log.error "tabledata.insertAll API", error_data
        end

        raise wrapped
      end

      JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do
        def as_hash(*keys)
          if keys.empty?
            to_h
          else
            to_h.select { |k, _| keys.include?(k) }
          end
        end
      end

      def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_source, fields)
        configuration = {
          configuration: {
            load: {
              destination_table: {
                project_id: project,
                dataset_id: dataset,
                table_id: table_id,
              },
              write_disposition: "WRITE_APPEND",
              source_format: source_format,
              ignore_unknown_values: @options[:ignore_unknown_values],
              max_bad_records: @options[:max_bad_records],
            }
          }
        }

        job_id = create_job_id(chunk_id_hex, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load]
        configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id

        begin
          # Check table existance
          client.get_table(project, dataset, table_id)
        rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
          if e.status_code == 404 && /Not Found: Table/i =~ e.message
            raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table]
            raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
            configuration[:configuration][:load].merge!(schema: {fields: fields.to_a})
            configuration[:configuration][:load].merge!(time_partitioning: time_partitioning) if time_partitioning
            configuration[:configuration][:load].merge!(clustering: clustering) if clustering
          end
        end

        res = client.insert_job(
          project,
          configuration,
          {
            upload_source: upload_source,
            content_type: "application/octet-stream",
          }
        )
        JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
      rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
        log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message

        if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
          return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id)
        end

        raise Fluent::BigQuery::Error.wrap(e)
      end

      def fetch_load_job(job_reference)
        project = job_reference.project_id
        job_id = job_reference.job_id
        location = @options[:location]

        res = client.get_job(project, job_id, location: location)
        log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id)

        if res.status.state == "DONE"
          res
        end
      rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
        e = Fluent::BigQuery::Error.wrap(e) 
        raise e unless e.retryable?
      end

      def commit_load_job(chunk_id_hex, response)
        job_id = response.id
        project = response.configuration.load.destination_table.project_id
        dataset = response.configuration.load.destination_table.dataset_id
        table_id = response.configuration.load.destination_table.table_id

        errors = response.status.errors
        if errors
          errors.each do |e|
            log.error "job.load API (rows)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: e.message, reason: e.reason
          end
        end

        error_result = response.status.error_result
        if error_result
          log.error "job.load API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason
          if Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason)
            @num_errors_per_chunk[chunk_id_hex] = @num_errors_per_chunk[chunk_id_hex].to_i + 1
            raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry")
          else
            @num_errors_per_chunk.delete(chunk_id_hex)
            raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry")
          end
        end

        # `stats` can be nil if we receive a warning like "Warning: Load job succeeded with data imported, however statistics may be lost due to internal error."
        stats = response.statistics.load
        duration = (response.statistics.end_time - response.statistics.creation_time) / 1000.0
        log.debug "load job finished", id: job_id, state: response.status.state, input_file_bytes: stats&.input_file_bytes, input_files: stats&.input_files, output_bytes: stats&.output_bytes, output_rows: stats&.output_rows, bad_records: stats&.bad_records, duration: duration.round(2), project_id: project, dataset: dataset, table: table_id
        @num_errors_per_chunk.delete(chunk_id_hex)
      end

      private

      def log
        @log
      end

      def get_auth
        case @auth_method
        when :private_key
          get_auth_from_private_key
        when :compute_engine
          get_auth_from_compute_engine
        when :json_key
          get_auth_from_json_key
        when :application_default
          get_auth_from_application_default
        else
          raise ConfigError, "Unknown auth method: #{@auth_method}"
        end
      end

      def get_auth_from_private_key
        require 'google/api_client/auth/key_utils'
        private_key_path = @options[:private_key_path]
        private_key_passphrase = @options[:private_key_passphrase]
        email = @options[:email]

        key = Google::APIClient::KeyUtils.load_from_pkcs12(private_key_path, private_key_passphrase)
        Signet::OAuth2::Client.new(
          token_credential_uri: "https://accounts.google.com/o/oauth2/token",
          audience: "https://accounts.google.com/o/oauth2/token",
          scope: @scope,
          issuer: email,
          signing_key: key
        )
      end

      def get_auth_from_compute_engine
        Google::Auth::GCECredentials.new
      end

      def get_auth_from_json_key
        json_key = @options[:json_key]

        begin
          JSON.parse(json_key)
          key = StringIO.new(json_key)
          Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: @scope)
        rescue JSON::ParserError
          key = json_key
          File.open(json_key) do |f|
            Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: @scope)
          end
        end
      end

      def get_auth_from_application_default
        Google::Auth.get_application_default([@scope])
      end

      def safe_table_id(table_id)
        table_id.gsub(/\$\d+$/, "")
      end

      def create_job_id(chunk_id_hex, dataset, table, schema)
        job_id_key = "#{chunk_id_hex}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id_hex]}"
        @log.debug "job_id_key: #{job_id_key}"
        "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key)
      end

      def source_format
        case @options[:source_format]
        when :json
          "NEWLINE_DELIMITED_JSON"
        when :avro
          "AVRO"
        when :csv
          "CSV"
        else
          "NEWLINE_DELIMITED_JSON"
        end
      end

      def time_partitioning
        return @time_partitioning if instance_variable_defined?(:@time_partitioning)

        if @options[:time_partitioning_type]
          @time_partitioning = {
            type: @options[:time_partitioning_type].to_s.upcase,
            field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil,
            expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil,
          }.reject { |_, v| v.nil? }
        else
          @time_partitioning
        end
      end

      def clustering
        return @clustering if instance_variable_defined?(:@clustering)

        if @options[:clustering_fields]
          @clustering = {
            fields: @options[:clustering_fields]
          }
        else
          @clustering
        end
      end

      def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
        try_count ||= 1
        res = client.insert_all_table_data(project, dataset, table_id, body, {})
      rescue Google::Apis::ClientError => e
        if e.status_code == 404 && /Not Found: Table/i =~ e.message
          if try_count == 1
            # Table Not Found: Auto Create Table
            create_table(project, dataset, table_id, schema)
          elsif try_count > 10
            raise "A new table was created but it is not found."
          end

          # Retry to insert several times because the created table is not visible from Streaming insert for a little while
          # cf. https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts
          try_count += 1
          sleep 5
          log.debug "Retry to insert rows", project_id: project, dataset: dataset, table: table_id
          retry
        end
        raise
      end
    end
  end
end