lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.4.0 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.4.1
- old
+ new
@@ -192,10 +192,14 @@
allow_quoted_newlines: @task['allow_quoted_newlines'],
}
}
}
+ if @task['schema_update_options']
+ body[:configuration][:load][:schema_update_options] = @task['schema_update_options']
+ end
+
opts = {
upload_source: path,
content_type: "application/octet-stream",
# options: {
# retries: @task['retries'],
@@ -252,10 +256,14 @@
},
}
}
}
+ if @task['schema_update_options']
+ body[:configuration][:load][:schema_update_options] = @task['schema_update_options']
+ end
+
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, opts) }
wait_load('Copy', response)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@@ -369,28 +377,36 @@
}
raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}"
end
end
- def create_table(table, dataset: nil, options: {})
+ def create_table(table, dataset: nil, options: nil)
begin
- table = Helper.chomp_partition_decorator(table)
dataset ||= @dataset
+ options ||= {}
+ options['time_partitioning'] ||= @task['time_partitioning']
+ if Helper.has_partition_decorator?(table)
+ options['time_partitioning'] ||= {'type' => 'DAY'}
+ table = Helper.chomp_partition_decorator(table)
+ end
+
Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" }
body = {
table_reference: {
table_id: table,
},
schema: {
fields: fields,
}
}
+
if options['time_partitioning']
body[:time_partitioning] = {
type: options['time_partitioning']['type'],
expiration_ms: options['time_partitioning']['expiration_ms'],
}
end
+
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{body}, #{opts})" }
with_network_retry { client.insert_table(@project, dataset, body, opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message