lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.7 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.4.0
- old
+ new
@@ -15,10 +15,18 @@
@schema = schema
reset_fields(fields) if fields
@project = @task['project']
@dataset = @task['dataset']
+
+ @task['source_format'] ||= 'CSV'
+ @task['max_bad_records'] ||= 0
+ @task['field_delimiter'] ||= ','
+ @task['source_format'] == 'CSV' ? @task['field_delimiter'] : nil
+ @task['encoding'] ||= 'UTF-8'
+ @task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil?
+ @task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil?
end
def fields
return @fields if @fields
if @task['schema_file']
@@ -141,11 +149,11 @@
responses[idx] = response
end
responses
end
- def load(path, table)
+ def load(path, table, write_disposition: 'WRITE_APPEND')
with_job_retry do
begin
if File.exist?(path)
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
# we should generate job_id in client code, otherwise, retrying would cause duplication
@@ -173,11 +181,11 @@
table_id: table,
},
schema: {
fields: fields,
},
- write_disposition: 'WRITE_APPEND',
+ write_disposition: write_disposition,
source_format: @task['source_format'],
max_bad_records: @task['max_bad_records'],
field_delimiter: @task['source_format'] == 'CSV' ? @task['field_delimiter'] : nil,
encoding: @task['encoding'],
ignore_unknown_values: @task['ignore_unknown_values'],
@@ -231,19 +239,19 @@
configuration: {
copy: {
create_deposition: 'CREATE_IF_NEEDED',
write_disposition: write_disposition,
source_table: {
- project_id: @project,
- dataset_id: @dataset,
- table_id: source_table,
- },
- destination_table: {
- project_id: @project,
- dataset_id: destination_dataset,
- table_id: destination_table,
- },
+ project_id: @project,
+ dataset_id: @dataset,
+ table_id: source_table,
+ },
+ destination_table: {
+ project_id: @project,
+ dataset_id: destination_dataset,
+ table_id: destination_table,
+ },
}
}
}
opts = {}
@@ -361,69 +369,96 @@
}
raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}"
end
end
- def create_table(table)
+ def create_table(table, dataset: nil, options: {})
begin
- Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{@dataset}.#{table}" }
+ table = Helper.chomp_partition_decorator(table)
+ dataset ||= @dataset
+ Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" }
body = {
table_reference: {
table_id: table,
},
schema: {
fields: fields,
}
}
+ if options['time_partitioning']
+ body[:time_partitioning] = {
+ type: options['time_partitioning']['type'],
+ expiration_ms: options['time_partitioning']['expiration_ms'],
+ }
+ end
opts = {}
- Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{@dataset}, #{body}, #{opts})" }
- with_network_retry { client.insert_table(@project, @dataset, body, opts) }
+ Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{body}, #{opts})" }
+ with_network_retry { client.insert_table(@project, dataset, body, opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
return
end
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
- "embulk-output-bigquery: insert_table(#{@project}, #{@dataset}, #{body}, #{opts}), response:#{response}"
+ "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{body}, #{opts}), response:#{response}"
}
- raise Error, "failed to create table #{@project}:#{@dataset}.#{table}, response:#{response}"
+ raise Error, "failed to create table #{@project}:#{dataset}.#{table}, response:#{response}"
end
end
- def delete_table(table)
+ def delete_table(table, dataset: nil)
begin
- Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{@dataset}.#{table}" }
- with_network_retry { client.delete_table(@project, @dataset, table) }
+ table = Helper.chomp_partition_decorator(table)
+ dataset ||= @dataset
+ Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" }
+ with_network_retry { client.delete_table(@project, dataset, table) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404 && /Not found:/ =~ e.message
# ignore 'Not Found' error
return
end
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
- "embulk-output-bigquery: delete_table(#{@project}, #{@dataset}, #{table}), response:#{response}"
+ "embulk-output-bigquery: delete_table(#{@project}, #{dataset}, #{table}), response:#{response}"
}
- raise Error, "failed to delete table #{@project}:#{@dataset}.#{table}, response:#{response}"
+ raise Error, "failed to delete table #{@project}:#{dataset}.#{table}, response:#{response}"
end
end
- def get_table(table)
+ def get_table(table, dataset: nil)
begin
- Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{@dataset}.#{table}" }
- with_network_retry { client.get_table(@project, @dataset, table) }
+ table = Helper.chomp_partition_decorator(table)
+ dataset ||= @dataset
+ Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" }
+ with_network_retry { client.get_table(@project, dataset, table) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404
- raise NotFoundError, "Table #{@project}:#{@dataset}.#{table} is not found"
+ raise NotFoundError, "Table #{@project}:#{dataset}.#{table} is not found"
end
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
- "embulk-output-bigquery: get_table(#{@project}, #{@dataset}, #{table}), response:#{response}"
+ "embulk-output-bigquery: get_table(#{@project}, #{dataset}, #{table}), response:#{response}"
}
- raise Error, "failed to get table #{@project}:#{@dataset}.#{table}, response:#{response}"
+ raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
+ end
+ end
+
+ # Is this only a way to drop partition?
+ def delete_partition(table_with_partition, dataset: nil)
+ dataset ||= @dataset
+ begin
+ table = Helper.chomp_partition_decorator(table_with_partition)
+ get_table(table, dataset: dataset)
+ rescue NotFoundError
+ else
+ Embulk.logger.info { "embulk-output-bigquery: Delete partition... #{@project}:#{dataset}.#{table_with_partition}" }
+ Tempfile.create('embulk_output_bigquery_empty_file_') do |fp|
+ load(fp.path, table_with_partition, write_disposition: 'WRITE_TRUNCATE')
+ end
end
end
end
end
end