lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-3.2.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-3.3.0

- old
+ new

@@ -99,10 +99,11 @@ raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry") end end end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + log.debug "insert error: #{e.message}", status_code: e.respond_to?(:status_code) ? e.status_code : nil, reason: e.respond_to?(:reason) ? e.reason : nil error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message } wrapped = Fluent::BigQuery::Error.wrap(e) if wrapped.retryable? log.warn "tabledata.insertAll API", error_data else @@ -110,11 +111,11 @@ end raise wrapped end - JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do + JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id, :location) do def as_hash(*keys) if keys.empty? to_h else to_h.select { |k, _| keys.include?(k) } @@ -159,11 +160,11 @@ project, configuration, upload_source: upload_source, content_type: "application/octet-stream", ) - JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id) + JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id, res.job_reference.location) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id) @@ -173,10 +174,10 @@ end def fetch_load_job(job_reference) project = job_reference.project_id job_id = job_reference.job_id - location = @options[:location] + location = job_reference.location res = client.get_job(project, job_id, location: location) log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id) if res.status.state == "DONE"