lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.0.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.1.0
- old
+ new
@@ -32,31 +32,24 @@
schema: {
fields: record_schema.to_a,
}
}
- if @options[:time_partitioning_type]
- definition[:time_partitioning] = {
- type: @options[:time_partitioning_type].to_s.upcase,
- field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil,
- expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil
- }.select { |_, value| !value.nil? }
- end
+ definition.merge!(time_partitioning: time_partitioning) if time_partitioning
client.insert_table(project, dataset, definition, {})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
message = e.message
if e.status_code == 409 && /Already Exists:/ =~ message
log.debug "already created table", project_id: project, dataset: dataset, table: table_id
# ignore 'Already Exists' error
return
end
- reason = e.respond_to?(:reason) ? e.reason : nil
- log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message, reason: reason
+ log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
- if Fluent::BigQuery::Error.retryable_error_reason?(reason) && create_table_retry_count < create_table_retry_limit
+ if create_table_retry_count < create_table_retry_limit
sleep create_table_retry_wait
create_table_retry_wait *= 2
create_table_retry_count += 1
retry
else
@@ -75,18 +68,23 @@
message = e.message
log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
nil
end
- def insert_rows(project, dataset, table_id, rows, template_suffix: nil)
+ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
body = {
rows: rows,
skip_invalid_rows: @options[:skip_invalid_rows],
ignore_unknown_values: @options[:ignore_unknown_values],
}
body.merge!(template_suffix: template_suffix) if template_suffix
- res = client.insert_all_table_data(project, dataset, table_id, body, {})
+
+ if @options[:auto_create_table]
+ res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
+ else
+ res = client.insert_all_table_data(project, dataset, table_id, body, {})
+ end
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size
if res.insert_errors && !res.insert_errors.empty?
log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
if @options[:allow_retry_insert_errors]
@@ -99,12 +97,11 @@
raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry")
end
end
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
- reason = e.respond_to?(:reason) ? e.reason : nil
- error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason }
+ error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message }
wrapped = Fluent::BigQuery::Error.wrap(e)
if wrapped.retryable?
log.warn "tabledata.insertAll API", error_data
else
log.error "tabledata.insertAll API", error_data
@@ -130,33 +127,31 @@
destination_table: {
project_id: project,
dataset_id: dataset,
table_id: table_id,
},
- schema: {
- fields: fields.to_a,
- },
write_disposition: "WRITE_APPEND",
source_format: source_format,
ignore_unknown_values: @options[:ignore_unknown_values],
max_bad_records: @options[:max_bad_records],
}
}
}
job_id = create_job_id(chunk_id_hex, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load]
- configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type]
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id
- # If target table is already exist, omit schema configuration.
- # Because schema changing is easier.
begin
- if client.get_table(project, dataset, table_id)
- configuration[:configuration][:load].delete(:schema)
+ # Check table existance
+ client.get_table(project, dataset, table_id)
+ rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
+ if e.status_code == 404 && /Not Found: Table/i =~ e.message
+ raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table]
+ raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
+ configuration[:configuration][:load].merge!(schema: {fields: fields.to_a})
+ configuration[:configuration][:load].merge!(time_partitioning: time_partitioning) if time_partitioning
end
- rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError
- raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
end
res = client.insert_job(
project,
configuration,
@@ -165,24 +160,12 @@
content_type: "application/octet-stream",
}
)
JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
- reason = e.respond_to?(:reason) ? e.reason : nil
- log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
+ log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message
- if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message
- # Table Not Found: Auto Create Table
- create_table(
- project,
- dataset,
- table_id,
- fields,
- )
- raise "table created. send rows next time."
- end
-
if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id)
end
raise Fluent::BigQuery::Error.wrap(e)
@@ -314,9 +297,46 @@
when :csv
"CSV"
else
"NEWLINE_DELIMITED_JSON"
end
+ end
+
+ def time_partitioning
+ return @time_partitioning if instance_variable_defined?(:@time_partitioning)
+
+ if @options[:time_partitioning_type]
+ @time_partitioning = {
+ type: @options[:time_partitioning_type].to_s.upcase,
+ field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil,
+ expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil,
+ require_partition_filter: @options[:time_partitioning_require_partition_filter],
+ }.reject { |_, v| v.nil? }
+ else
+ @time_partitioning
+ end
+ end
+
+ def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
+ try_count ||= 1
+ res = client.insert_all_table_data(project, dataset, table_id, body, {})
+ rescue Google::Apis::ClientError => e
+ if e.status_code == 404 && /Not Found: Table/i =~ e.message
+ if try_count == 1
+ # Table Not Found: Auto Create Table
+ create_table(project, dataset, table_id, schema)
+ elsif try_count > 10
+ raise "A new table was created but it is not found."
+ end
+
+ # Retry to insert several times because the created table is not visible from Streaming insert for a little while
+ # cf. https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts
+ try_count += 1
+ sleep 5
+ log.debug "Retry to insert rows", project_id: project, dataset: dataset, table: table_id
+ retry
+ end
+ raise
end
end
end
end