lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.1
- old
+ new
@@ -58,24 +58,32 @@
@cached_client_expiration = Time.now + 1800
@client = client
end
- def create_table(project, dataset, table_id, record_schema)
+ def create_table(project, dataset, table_id, record_schema, time_partitioning_type: nil, time_partitioning_expiration: nil)
create_table_retry_limit = 3
create_table_retry_wait = 1
create_table_retry_count = 0
begin
- client.insert_table(project, dataset, {
+ definition = {
table_reference: {
table_id: table_id,
},
schema: {
fields: record_schema.to_a,
}
- }, {})
+ }
+
+ if time_partitioning_type
+ definition[:time_partitioning] = {
+ type: time_partitioning_type.to_s.upcase,
+ expiration_ms: time_partitioning_expiration ? time_partitioning_expiration * 1000 : nil
+ }.compact
+ end
+ client.insert_table(project, dataset, definition, {})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
@client = nil
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@client = nil
@@ -122,11 +130,11 @@
body.merge!(template_suffix: template_suffix) if template_suffix
res = client.insert_all_table_data(project, dataset, table_id, body, {
options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec}
})
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size
- log.warn "insert errors", insert_errors: res.insert_errors if res.insert_errors && !res.insert_errors.empty?
+ log.warn "insert errors", insert_errors: res.insert_errors.to_s if res.insert_errors && !res.insert_errors.empty?
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
@@ -136,11 +144,11 @@
else
raise UnRetryableError.new(nil, e)
end
end
- def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60)
+ def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil)
configuration = {
configuration: {
load: {
destination_table: {
project_id: project,
@@ -155,10 +163,11 @@
ignore_unknown_values: ignore_unknown_values,
max_bad_records: max_bad_records,
}
}
}
+ configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id
# If target table is already exist, omit schema configuration.
# Because schema changing is easier.
begin
@@ -186,10 +195,16 @@
@client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
- return wait_load_job(project, dataset, job_id, table_id, retryable: false) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
+ if auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
+ # Table Not Found: Auto Create Table
+ create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration)
+ raise "table created. send rows next time."
+ end
+
+ return wait_load_job(project, dataset, job_id, table_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
raise RetryableError.new(nil, e)
else
raise UnRetryableError.new(nil, e)