lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.6.5 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.6.6
- old
+ new
@@ -16,10 +16,11 @@
super(task, scope, client_class)
@schema = schema
reset_fields(fields) if fields
@project = @task['project']
+ @destination_project = @task['destination_project']
@dataset = @task['dataset']
@location = @task['location']
@location_for_log = @location.nil? ? 'us/eu' : @location
@task['source_format'] ||= 'CSV'
@@ -78,21 +79,21 @@
with_job_retry do
begin
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
# we should generate job_id in client code, otherwise, retrying would cause duplication
job_id = "embulk_load_job_#{SecureRandom.uuid}"
- Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@project}:#{@dataset}.#{table} in #{@location_for_log}" }
+ Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}" }
body = {
job_reference: {
project_id: @project,
job_id: job_id,
},
configuration: {
load: {
destination_table: {
- project_id: @project,
+ project_id: @destination_project,
dataset_id: @dataset,
table_id: table,
},
schema: {
fields: fields,
@@ -128,11 +129,11 @@
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
}
- raise Error, "failed to load #{object_uris} to #{@project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
+ raise Error, "failed to load #{object_uris} to #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
end
end
end
def load_in_parallel(paths, table)
@@ -169,11 +170,11 @@
begin
if File.exist?(path)
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
# we should generate job_id in client code, otherwise, retrying would cause duplication
job_id = "embulk_load_job_#{SecureRandom.uuid}"
- Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{path} => #{@project}:#{@dataset}.#{table} in #{@location_for_log}" }
+ Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{path} => #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}" }
else
Embulk.logger.info { "embulk-output-bigquery: Load job starting... #{path} does not exist, skipped" }
return
end
@@ -183,11 +184,11 @@
job_id: job_id,
},
configuration: {
load: {
destination_table: {
- project_id: @project,
+ project_id: @destination_project,
dataset_id: @dataset,
table_id: table,
},
schema: {
fields: fields,
@@ -230,11 +231,11 @@
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
}
- raise Error, "failed to load #{path} to #{@project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
+ raise Error, "failed to load #{path} to #{@destination_project}:#{@dataset}.#{table} in #{@location_for_log}, response:#{response}"
end
end
end
def copy(source_table, destination_table, destination_dataset = nil, write_disposition: 'WRITE_TRUNCATE')
@@ -243,11 +244,11 @@
destination_dataset ||= @dataset
job_id = "embulk_copy_job_#{SecureRandom.uuid}"
Embulk.logger.info {
"embulk-output-bigquery: Copy job starting... job_id:[#{job_id}] " \
- "#{@project}:#{@dataset}.#{source_table} => #{@project}:#{destination_dataset}.#{destination_table}"
+ "#{@destination_project}:#{@dataset}.#{source_table} => #{@destination_project}:#{destination_dataset}.#{destination_table}"
}
body = {
job_reference: {
project_id: @project,
@@ -256,16 +257,16 @@
configuration: {
copy: {
create_deposition: 'CREATE_IF_NEEDED',
write_disposition: write_disposition,
source_table: {
- project_id: @project,
+ project_id: @destination_project,
dataset_id: @dataset,
table_id: source_table,
},
destination_table: {
- project_id: @project,
+ project_id: @destination_project,
dataset_id: destination_dataset,
table_id: destination_table,
},
}
}
@@ -282,12 +283,12 @@
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
}
- raise Error, "failed to copy #{@project}:#{@dataset}.#{source_table} " \
- "to #{@project}:#{destination_dataset}.#{destination_table}, response:#{response}"
+ raise Error, "failed to copy #{@destination_project}:#{@dataset}.#{source_table} " \
+ "to #{@destination_project}:#{destination_dataset}.#{destination_table}, response:#{response}"
end
end
end
def wait_load(kind, response)
@@ -352,11 +353,11 @@
end
def create_dataset(dataset = nil, reference: nil)
dataset ||= @dataset
begin
- Embulk.logger.info { "embulk-output-bigquery: Create dataset... #{@project}:#{dataset} in #{@location_for_log}" }
+ Embulk.logger.info { "embulk-output-bigquery: Create dataset... #{@destination_project}:#{dataset} in #{@location_for_log}" }
hint = {}
if reference
response = get_dataset(reference)
hint = { access: response.access }
end
@@ -380,29 +381,29 @@
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_dataset(#{@project}, #{body}, #{opts}), response:#{response}"
}
- raise Error, "failed to create dataset #{@project}:#{dataset} in #{@location_for_log}, response:#{response}"
+ raise Error, "failed to create dataset #{@destination_project}:#{dataset} in #{@location_for_log}, response:#{response}"
end
end
def get_dataset(dataset = nil)
dataset ||= @dataset
begin
- Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@project}:#{dataset}" }
- with_network_retry { client.get_dataset(@project, dataset) }
+ Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@destination_project}:#{dataset}" }
+ with_network_retry { client.get_dataset(@destination_project, dataset) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404
- raise NotFoundError, "Dataset #{@project}:#{dataset} is not found"
+ raise NotFoundError, "Dataset #{@destination_project}:#{dataset} is not found"
end
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
- "embulk-output-bigquery: get_dataset(#{@project}, #{dataset}), response:#{response}"
+ "embulk-output-bigquery: get_dataset(#{@destination_project}, #{dataset}), response:#{response}"
}
- raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}"
+ raise Error, "failed to get dataset #{@destination_project}:#{dataset}, response:#{response}"
end
end
def create_table_if_not_exists(table, dataset: nil, options: nil)
begin
@@ -412,11 +413,11 @@
if Helper.has_partition_decorator?(table)
options['time_partitioning'] ||= {'type' => 'DAY'}
table = Helper.chomp_partition_decorator(table)
end
- Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" }
+ Embulk.logger.info { "embulk-output-bigquery: Create table... #{@destination_project}:#{dataset}.#{table}" }
body = {
table_reference: {
table_id: table,
},
schema: {
@@ -450,11 +451,11 @@
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}"
}
- raise Error, "failed to create table #{@project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}"
+ raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}"
end
end
def delete_table(table, dataset: nil)
table = Helper.chomp_partition_decorator(table)
@@ -467,23 +468,23 @@
# if `table` with a partition decorator is given, a partition is deleted.
def delete_table_or_partition(table, dataset: nil)
begin
dataset ||= @dataset
- Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{dataset}.#{table}" }
- with_network_retry { client.delete_table(@project, dataset, table) }
+ Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@destination_project}:#{dataset}.#{table}" }
+ with_network_retry { client.delete_table(@destination_project, dataset, table) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404 && /Not found:/ =~ e.message
# ignore 'Not Found' error
return
end
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
- "embulk-output-bigquery: delete_table(#{@project}, #{dataset}, #{table}), response:#{response}"
+ "embulk-output-bigquery: delete_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}"
}
- raise Error, "failed to delete table #{@project}:#{dataset}.#{table}, response:#{response}"
+ raise Error, "failed to delete table #{@destination_project}:#{dataset}.#{table}, response:#{response}"
end
end
def get_table(table, dataset: nil)
table = Helper.chomp_partition_decorator(table)
@@ -495,21 +496,21 @@
end
def get_table_or_partition(table, dataset: nil)
begin
dataset ||= @dataset
- Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{dataset}.#{table}" }
- with_network_retry { client.get_table(@project, dataset, table) }
+ Embulk.logger.info { "embulk-output-bigquery: Get table... #{@destination_project}:#{dataset}.#{table}" }
+ with_network_retry { client.get_table(@destination_project, dataset, table) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404
- raise NotFoundError, "Table #{@project}:#{dataset}.#{table} is not found"
+ raise NotFoundError, "Table #{@destination_project}:#{dataset}.#{table} is not found"
end
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
- "embulk-output-bigquery: get_table(#{@project}, #{dataset}, #{table}), response:#{response}"
+ "embulk-output-bigquery: get_table(#{@destination_project}, #{dataset}, #{table}), response:#{response}"
}
- raise Error, "failed to get table #{@project}:#{dataset}.#{table}, response:#{response}"
+ raise Error, "failed to get table #{@destination_project}:#{dataset}.#{table}, response:#{response}"
end
end
end
end
end