lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-3.2.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-3.3.0
- old
+ new
@@ -99,10 +99,11 @@
raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry")
end
end
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
+ log.debug "insert error: #{e.message}", status_code: e.respond_to?(:status_code) ? e.status_code : nil, reason: e.respond_to?(:reason) ? e.reason : nil
error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message }
wrapped = Fluent::BigQuery::Error.wrap(e)
if wrapped.retryable?
log.warn "tabledata.insertAll API", error_data
else
@@ -110,11 +111,11 @@
end
raise wrapped
end
- JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do
+ JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id, :location) do
def as_hash(*keys)
if keys.empty?
to_h
else
to_h.select { |k, _| keys.include?(k) }
@@ -159,11 +160,11 @@
project,
configuration,
upload_source: upload_source,
content_type: "application/octet-stream",
)
- JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
+ JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id, res.job_reference.location)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message
if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id)
@@ -173,10 +174,10 @@
end
def fetch_load_job(job_reference)
project = job_reference.project_id
job_id = job_reference.job_id
- location = @options[:location]
+ location = job_reference.location
res = client.get_job(project, job_id, location: location)
log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id)
if res.status.state == "DONE"