lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.7 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.4.0

- old
+ new

@@ -15,10 +15,18 @@ @schema = schema reset_fields(fields) if fields @project = @task['project'] @dataset = @task['dataset'] + + @task['source_format'] ||= 'CSV' + @task['max_bad_records'] ||= 0 + @task['field_delimiter'] ||= ',' + @task['source_format'] == 'CSV' ? @task['field_delimiter'] : nil + @task['encoding'] ||= 'UTF-8' + @task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil? + @task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil? end def fields return @fields if @fields if @task['schema_file'] @@ -141,11 +149,11 @@ responses[idx] = response end responses end - def load(path, table) + def load(path, table, write_disposition: 'WRITE_APPEND') with_job_retry do begin if File.exist?(path) # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says, # we should generate job_id in client code, otherwise, retrying would cause duplication @@ -173,11 +181,11 @@ table_id: table, }, schema: { fields: fields, }, - write_disposition: 'WRITE_APPEND', + write_disposition: write_disposition, source_format: @task['source_format'], max_bad_records: @task['max_bad_records'], field_delimiter: @task['source_format'] == 'CSV' ? @task['field_delimiter'] : nil, encoding: @task['encoding'], ignore_unknown_values: @task['ignore_unknown_values'], @@ -231,19 +239,19 @@ configuration: { copy: { create_deposition: 'CREATE_IF_NEEDED', write_disposition: write_disposition, source_table: { - project_id: @project, - dataset_id: @dataset, - table_id: source_table, - }, - destination_table: { - project_id: @project, - dataset_id: destination_dataset, - table_id: destination_table, - }, + project_id: @project, + dataset_id: @dataset, + table_id: source_table, + }, + destination_table: { + project_id: @project, + dataset_id: destination_dataset, + table_id: destination_table, + }, } } } opts = {} @@ -361,69 +369,96 @@ } raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}" end end - def create_table(table) + def create_table(table, dataset: nil, options: {}) begin - Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{@dataset}.#{table}" } + table = Helper.chomp_partition_decorator(table) + dataset ||= @dataset + Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" } body = { table_reference: { table_id: table, }, schema: { fields: fields, } } + if options['time_partitioning'] + body[:time_partitioning] = { + type: options['time_partitioning']['type'], + expiration_ms: options['time_partitioning']['expiration_ms'], + } + end opts = {} - Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{@dataset}, #{body}, #{opts})" } - with_network_retry { client.insert_table(@project, @dataset, body, opts) } + Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{body}, #{opts})" } + with_network_retry { client.insert_table(@project, dataset, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error return end response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: insert_table(#{@project}, #{@dataset}, #{body}, #{opts}), response:#{response}" + "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{body}, #{opts}), response:#{response}" } - raise Error, "failed to create table #{@project}:#{@dataset}.#{table}, response:#{response}" + raise Error, "failed to create table #{@project}:#{dataset}.#{table}, response:#{response}" end end - def delete_table(table) + def delete_table(table, dataset: nil) begin - Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{@dataset}.#{table}" } - with_network_retry { client.delete_table(@project, @dataset, table) } + table = Helper.chomp_partition_decorator(table) + dataset ||= @dataset + Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" } + with_network_retry { client.delete_table(@project, dataset, table) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 && /Not found:/ =~ e.message # ignore 'Not Found' error return end response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: delete_table(#{@project}, #{@dataset}, #{table}), response:#{response}" + "embulk-output-bigquery: delete_table(#{@project}, #{dataset}, #{table}), response:#{response}" } - raise Error, "failed to delete table #{@project}:#{@dataset}.#{table}, response:#{response}" + raise Error, "failed to delete table #{@project}:#{dataset}.#{table}, response:#{response}" end end - def get_table(table) + def get_table(table, dataset: nil) begin - Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{@dataset}.#{table}" } - with_network_retry { client.get_table(@project, @dataset, table) } + table = Helper.chomp_partition_decorator(table) + dataset ||= @dataset + Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" } + with_network_retry { client.get_table(@project, dataset, table) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 - raise NotFoundError, "Table #{@project}:#{@dataset}.#{table} is not found" + raise NotFoundError, "Table #{@project}:#{dataset}.#{table} is not found" end response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: get_table(#{@project}, #{@dataset}, #{table}), response:#{response}" + "embulk-output-bigquery: get_table(#{@project}, #{dataset}, #{table}), response:#{response}" } - raise Error, "failed to get table #{@project}:#{@dataset}.#{table}, response:#{response}" + raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}" + end + end + + # Is this only a way to drop partition? + def delete_partition(table_with_partition, dataset: nil) + dataset ||= @dataset + begin + table = Helper.chomp_partition_decorator(table_with_partition) + get_table(table, dataset: dataset) + rescue NotFoundError + else + Embulk.logger.info { "embulk-output-bigquery: Delete partition... #{@project}:#{dataset}.#{table_with_partition}" } + Tempfile.create('embulk_output_bigquery_empty_file_') do |fp| + load(fp.path, table_with_partition, write_disposition: 'WRITE_TRUNCATE') + end end end end end end