lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.4 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.0
- old
+ new
@@ -4,10 +4,11 @@
def initialize(log, auth_method, auth_options = {})
@auth_method = auth_method
@scope = "https://www.googleapis.com/auth/bigquery"
@auth_options = auth_options
@log = log
+ @num_errors_per_chunk = {}
@cached_client_expiration = Time.now + 1800
end
def client
@@ -102,11 +103,11 @@
log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
raise Fluent::BigQuery::Error.wrap(e)
end
- def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil)
+ def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields, prevent_duplicate_load: false, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil)
configuration = {
configuration: {
load: {
destination_table: {
project_id: project,
@@ -121,10 +122,12 @@
ignore_unknown_values: ignore_unknown_values,
max_bad_records: max_bad_records,
}
}
}
+
+ job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a, max_bad_records, ignore_unknown_values) if prevent_duplicate_load
configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id
# If target table is already exist, omit schema configuration.
# Because schema changing is easier.
@@ -146,11 +149,12 @@
timeout_sec: timeout_sec,
open_timeout_sec: open_timeout_sec,
}
}
)
- wait_load_job(project, dataset, res.job_reference.job_id, table_id)
+ wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id)
+ @num_errors_per_chunk.delete(chunk_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
@@ -159,16 +163,20 @@
# Table Not Found: Auto Create Table
create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration)
raise "table created. send rows next time."
end
- return wait_load_job(project, dataset, job_id, table_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
+ if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
+ wait_load_job(chunk_id, project, dataset, job_id, table_id)
+ @num_errors_per_chunk.delete(chunk_id)
+ return
+ end
raise Fluent::BigQuery::Error.wrap(e)
end
- def wait_load_job(project, dataset, job_id, table_id, retryable: true)
+ def wait_load_job(chunk_id, project, dataset, job_id, table_id)
wait_interval = 10
_response = client.get_job(project, job_id)
until _response.status.state == "DONE"
log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id
@@ -184,13 +192,15 @@
end
error_result = _response.status.error_result
if error_result
log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason
- if retryable && Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason)
+ if Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason)
+ @num_errors_per_chunk[chunk_id] = @num_errors_per_chunk[chunk_id].to_i + 1
raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry")
else
+ @num_errors_per_chunk.delete(chunk_id)
raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry")
end
end
log.debug "finish load job", state: _response.status.state
@@ -256,9 +266,15 @@
Google::Auth.get_application_default([@scope])
end
def safe_table_id(table_id)
table_id.gsub(/\$\d+$/, "")
+ end
+
+ def create_job_id(chunk_id, dataset, table, schema, max_bad_records, ignore_unknown_values)
+ job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}#{@num_errors_per_chunk[chunk_id]}"
+ @log.debug "job_id_key: #{job_id_key}"
+ "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key)
end
end
end
end