lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.1
- old
+ new
@@ -82,22 +82,35 @@
message = e.message
log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
nil
end
- def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60)
+ def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60, allow_retry_insert_errors: false)
body = {
rows: rows,
skip_invalid_rows: skip_invalid_rows,
ignore_unknown_values: ignore_unknown_values,
}
body.merge!(template_suffix: template_suffix) if template_suffix
res = client.insert_all_table_data(project, dataset, table_id, body, {
options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec}
})
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size
- log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s if res.insert_errors && !res.insert_errors.empty?
+
+ if res.insert_errors && !res.insert_errors.empty?
+ log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
+ if allow_retry_insert_errors
+ is_included_any_retryable_insert_error = res.insert_errors.any? do |insert_error|
+ insert_error.errors.any? { |error| Fluent::BigQuery::Error.retryable_insert_errors_reason?(error.reason) }
+ end
+ if is_included_any_retryable_insert_error
+ raise Fluent::BigQuery::RetryableError.new("failed to insert into bigquery(insert errors), retry")
+ else
+ raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry")
+ end
+ end
+ end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
@@ -164,10 +177,10 @@
create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration)
raise "table created. send rows next time."
end
if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
- wait_load_job(chunk_id, project, dataset, job_id, table_id)
+ wait_load_job(chunk_id, project, dataset, job_id, table_id)
@num_errors_per_chunk.delete(chunk_id)
return
end
raise Fluent::BigQuery::Error.wrap(e)