lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.1

- old
+ new

@@ -82,22 +82,35 @@ message = e.message log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message nil end - def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60) + def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60, allow_retry_insert_errors: false) body = { rows: rows, skip_invalid_rows: skip_invalid_rows, ignore_unknown_values: ignore_unknown_values, } body.merge!(template_suffix: template_suffix) if template_suffix res = client.insert_all_table_data(project, dataset, table_id, body, { options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec} }) log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size - log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s if res.insert_errors && !res.insert_errors.empty? + + if res.insert_errors && !res.insert_errors.empty? + log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s + if allow_retry_insert_errors + is_included_any_retryable_insert_error = res.insert_errors.any? do |insert_error| + insert_error.errors.any? { |error| Fluent::BigQuery::Error.retryable_insert_errors_reason?(error.reason) } + end + if is_included_any_retryable_insert_error + raise Fluent::BigQuery::RetryableError.new("failed to insert into bigquery(insert errors), retry") + else + raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry") + end + end + end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason @@ -164,10 +177,10 @@ create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration) raise "table created. send rows next time." end if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job - wait_load_job(chunk_id, project, dataset, job_id, table_id) + wait_load_job(chunk_id, project, dataset, job_id, table_id) @num_errors_per_chunk.delete(chunk_id) return end raise Fluent::BigQuery::Error.wrap(e)