lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.6.5 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.6.6

- old
+ new

@@ -16,10 +16,11 @@ super(task, scope, client_class) @schema = schema reset_fields(fields) if fields @project = @task['project'] + @destination_project = @task['destination_project'] @dataset = @task['dataset'] @location = @task['location'] @location_for_log = @location.nil? ? 'us/eu' : @location @task['source_format'] ||= 'CSV' @@ -78,21 +79,21 @@ with_job_retry do begin # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says, # we should generate job_id in client code, otherwise, retrying would cause duplication job_id = "embulk_load_job_#{SecureRandom.uuid}" - Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@project}:#{@dataset}.#{table} in #{@location_for_log}" } + Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}" } body = { job_reference: { project_id: @project, job_id: job_id, }, configuration: { load: { destination_table: { - project_id: @project, + project_id: @destination_project, dataset_id: @dataset, table_id: table, }, schema: { fields: fields, @@ -128,11 +129,11 @@ rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}" } - raise Error, "failed to load #{object_uris} to #{@project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}" + raise Error, "failed to load #{object_uris} to #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}" end end end def load_in_parallel(paths, table) @@ -169,11 +170,11 @@ begin if File.exist?(path) # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says, # we should generate job_id in client code, otherwise, retrying would cause duplication job_id = "embulk_load_job_#{SecureRandom.uuid}" - Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{path} => #{@project}:#{@dataset}.#{table} in #{@location_for_log}" } + Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{path} => #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}" } else Embulk.logger.info { "embulk-output-bigquery: Load job starting... #{path} does not exist, skipped" } return end @@ -183,11 +184,11 @@ job_id: job_id, }, configuration: { load: { destination_table: { - project_id: @project, + project_id: @destination_project, dataset_id: @dataset, table_id: table, }, schema: { fields: fields, @@ -230,11 +231,11 @@ rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}" } - raise Error, "failed to load #{path} to #{@project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}" + raise Error, "failed to load #{path} to #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}" end end end def copy(source_table, destination_table, destination_dataset = nil, write_disposition: 'WRITE_TRUNCATE') @@ -243,11 +244,11 @@ destination_dataset ||= @dataset job_id = "embulk_copy_job_#{SecureRandom.uuid}" Embulk.logger.info { "embulk-output-bigquery: Copy job starting... job_id:[#{job_id}] " \ - "#{@project}:#{@dataset}.#{source_table} => #{@project}:#{destination_dataset}.#{destination_table}" + "#{@destination_project}:#{@dataset}.#{source_table} => #{@destination_project}:#{destination_dataset}.#{destination_table}" } body = { job_reference: { project_id: @project, @@ -256,16 +257,16 @@ configuration: { copy: { create_deposition: 'CREATE_IF_NEEDED', write_disposition: write_disposition, source_table: { - project_id: @project, + project_id: @destination_project, dataset_id: @dataset, table_id: source_table, }, destination_table: { - project_id: @project, + project_id: @destination_project, dataset_id: destination_dataset, table_id: destination_table, }, } } @@ -282,12 +283,12 @@ rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}" } - raise Error, "failed to copy #{@project}:#{@dataset}.#{source_table} " \ - "to #{@project}:#{destination_dataset}.#{destination_table}, response:#{response}" + raise Error, "failed to copy #{@destination_project}:#{@dataset}.#{source_table} " \ + "to #{@destination_project}:#{destination_dataset}.#{destination_table}, response:#{response}" end end end def wait_load(kind, response) @@ -352,11 +353,11 @@ end def create_dataset(dataset = nil, reference: nil) dataset ||= @dataset begin - Embulk.logger.info { "embulk-output-bigquery: Create dataset... #{@project}:#{dataset} in #{@location_for_log}" } + Embulk.logger.info { "embulk-output-bigquery: Create dataset... #{@destination_project}:#{dataset} in #{@location_for_log}" } hint = {} if reference response = get_dataset(reference) hint = { access: response.access } end @@ -380,29 +381,29 @@ response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_dataset(#{@project}, #{body}, #{opts}), response:#{response}" } - raise Error, "failed to create dataset #{@project}:#{dataset} in #{@location_for_log}, response:#{response}" + raise Error, "failed to create dataset #{@destination_project}:#{dataset} in #{@location_for_log}, response:#{response}" end end def get_dataset(dataset = nil) dataset ||= @dataset begin - Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@project}:#{dataset}" } - with_network_retry { client.get_dataset(@project, dataset) } + Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@destination_project}:#{dataset}" } + with_network_retry { client.get_dataset(@destination_project, dataset) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 - raise NotFoundError, "Dataset #{@project}:#{dataset} is not found" + raise NotFoundError, "Dataset #{@destination_project}:#{dataset} is not found" end response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: get_dataset(#{@project}, #{dataset}), response:#{response}" + "embulk-output-bigquery: get_dataset(#{@destination_project}, #{dataset}), response:#{response}" } - raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}" + raise Error, "failed to get dataset #{@destination_project}:#{dataset}, response:#{response}" end end def create_table_if_not_exists(table, dataset: nil, options: nil) begin @@ -412,11 +413,11 @@ if Helper.has_partition_decorator?(table) options['time_partitioning'] ||= {'type' => 'DAY'} table = Helper.chomp_partition_decorator(table) end - Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" } + Embulk.logger.info { "embulk-output-bigquery: Create table... #{@destination_project}:#{dataset}.#{table}" } body = { table_reference: { table_id: table, }, schema: { @@ -450,11 +451,11 @@ response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}" } - raise Error, "failed to create table #{@project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}" + raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}" end end def delete_table(table, dataset: nil) table = Helper.chomp_partition_decorator(table) @@ -467,23 +468,23 @@ # if `table` with a partition decorator is given, a partition is deleted. def delete_table_or_partition(table, dataset: nil) begin dataset ||= @dataset - Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" } - with_network_retry { client.delete_table(@project, dataset, table) } + Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@destination_project}:#{dataset}.#{table}" } + with_network_retry { client.delete_table(@destination_project, dataset, table) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 && /Not found:/ =~ e.message # ignore 'Not Found' error return end response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: delete_table(#{@project}, #{dataset}, #{table}), response:#{response}" + "embulk-output-bigquery: delete_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}" } - raise Error, "failed to delete table #{@project}:#{dataset}.#{table}, response:#{response}" + raise Error, "failed to delete table #{@destination_project}:#{dataset}.#{table}, response:#{response}" end end def get_table(table, dataset: nil) table = Helper.chomp_partition_decorator(table) @@ -495,21 +496,21 @@ end def get_table_or_partition(table, dataset: nil) begin dataset ||= @dataset - Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" } - with_network_retry { client.get_table(@project, dataset, table) } + Embulk.logger.info { "embulk-output-bigquery: Get table... #{@destination_project}:#{dataset}.#{table}" } + with_network_retry { client.get_table(@destination_project, dataset, table) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 - raise NotFoundError, "Table #{@project}:#{dataset}.#{table} is not found" + raise NotFoundError, "Table #{@destination_project}:#{dataset}.#{table} is not found" end response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: get_table(#{@project}, #{dataset}, #{table}), response:#{response}" + "embulk-output-bigquery: get_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}" } - raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}" + raise Error, "failed to get table #{@destination_project}:#{dataset}.#{table}, response:#{response}" end end end end end