lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.1.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.2.0
- old
+ new
@@ -33,10 +33,11 @@
fields: record_schema.to_a,
}
}
definition.merge!(time_partitioning: time_partitioning) if time_partitioning
+ definition.merge!(clustering: clustering) if clustering
client.insert_table(project, dataset, definition, {})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
message = e.message
if e.status_code == 409 && /Already Exists:/ =~ message
@@ -147,10 +148,11 @@
if e.status_code == 404 && /Not Found: Table/i =~ e.message
raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table]
raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
configuration[:configuration][:load].merge!(schema: {fields: fields.to_a})
configuration[:configuration][:load].merge!(time_partitioning: time_partitioning) if time_partitioning
+ configuration[:configuration][:load].merge!(clustering: clustering) if clustering
end
end
res = client.insert_job(
project,
@@ -172,12 +174,13 @@
end
def fetch_load_job(job_reference)
project = job_reference.project_id
job_id = job_reference.job_id
+ location = @options[:location]
- res = client.get_job(project, job_id)
+ res = client.get_job(project, job_id, location: location)
log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id)
if res.status.state == "DONE"
res
end
@@ -307,13 +310,24 @@
if @options[:time_partitioning_type]
@time_partitioning = {
type: @options[:time_partitioning_type].to_s.upcase,
field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil,
expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil,
- require_partition_filter: @options[:time_partitioning_require_partition_filter],
}.reject { |_, v| v.nil? }
else
@time_partitioning
+ end
+ end
+
+ def clustering
+ return @clustering if instance_variable_defined?(:@clustering)
+
+ if @options[:clustering_fields]
+ @clustering = {
+ fields: @options[:clustering_fields]
+ }
+ else
+ @clustering
end
end
def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
try_count ||= 1