lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.6 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.7
- old
+ new
@@ -38,11 +38,11 @@
def reset_fields(fields = nil)
@fields = fields
self.fields
end
- def with_retry_job(&block)
+ def with_job_retry(&block)
retries = 0
begin
yield
rescue BackendError, InternalError => e
if retries < @task['retries']
@@ -57,11 +57,11 @@
end
# @params gcs_patsh [Array] arary of gcs paths such as gs://bucket/path
# @return [Array] responses
def load_from_gcs(object_uris, table)
- with_retry_job do
+ with_job_retry do
begin
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
# we should generate job_id in client code, otherwise, retrying would cause duplication
if @task['prevent_duplicate_insert'] and (@task['mode'] == 'append' or @task['mode'] == 'append_direct')
job_id = Helper.create_load_job_id(@task, path, fields)
@@ -97,11 +97,11 @@
}
}
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
- response = client.insert_job(@project, body, opts)
+ response = with_network_retry { client.insert_job(@project, body, opts) }
unless @task['is_skip_job_result_check']
response = wait_load('Load', response)
end
[response]
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@@ -126,27 +126,27 @@
# So, we dropped it. See https://github.com/embulk/embulk-output-bigquery/pull/35
responses = []
threads = []
Embulk.logger.debug { "embulk-output-bigquery: LOAD IN PARALLEL #{paths}" }
paths.each_with_index do |path, idx|
- threads << Thread.new do
+ threads << Thread.new(path, idx) do |path, idx|
# I am not sure whether google-api-ruby-client is thread-safe,
# so let me create new instances for each thread for safe
bigquery = self.class.new(@task, @schema, fields)
response = bigquery.load(path, table)
[idx, response]
end
end
ThreadsWait.all_waits(*threads) do |th|
idx, response = th.value # raise errors occurred in threads
- responses[idx] = response if idx
+ responses[idx] = response
end
responses
end
def load(path, table)
- with_retry_job do
+ with_job_retry do
begin
if File.exist?(path)
# As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
# we should generate job_id in client code, otherwise, retrying would cause duplication
if @task['prevent_duplicate_insert'] and (@task['mode'] == 'append' or @task['mode'] == 'append_direct')
@@ -194,11 +194,11 @@
# timeout_sec: @task['timeout_sec'],
# open_timeout_sec: @task['open_timeout_sec']
# },
}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
- response = client.insert_job(@project, body, opts)
+ response = with_network_retry { client.insert_job(@project, body, opts) }
if @task['is_skip_job_result_check']
response
else
response = wait_load('Load', response)
end
@@ -211,11 +211,11 @@
end
end
end
def copy(source_table, destination_table, destination_dataset = nil, write_disposition: 'WRITE_TRUNCATE')
- with_retry_job do
+ with_job_retry do
begin
destination_dataset ||= @dataset
job_id = "embulk_copy_job_#{SecureRandom.uuid}"
Embulk.logger.info {
@@ -246,11 +246,11 @@
}
}
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
- response = client.insert_job(@project, body, opts)
+ response = with_network_retry { client.insert_job(@project, body, opts) }
wait_load('Copy', response)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
@@ -287,11 +287,11 @@
Embulk.logger.info {
"embulk-output-bigquery: #{kind} job checking... " \
"job_id:[#{job_id}] elapsed_time:#{elapsed.to_f}sec status:[#{status}]"
}
sleep wait_interval
- _response = client.get_job(@project, job_id)
+ _response = with_network_retry { client.get_job(@project, job_id) }
end
end
# cf. http://www.rubydoc.info/github/google/google-api-ruby-client/Google/Apis/BigqueryV2/JobStatus#errors-instance_method
# `errors` returns Array<Google::Apis::BigqueryV2::ErrorProto> if any error exists.
@@ -328,11 +328,11 @@
dataset_id: dataset,
},
}.merge(hint)
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{body}, #{opts})" }
- client.insert_dataset(@project, body, opts)
+ with_network_retry { client.insert_dataset(@project, body, opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
return
end
@@ -347,11 +347,11 @@
def get_dataset(dataset = nil)
dataset ||= @dataset
begin
Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@project}:#{@dataset}" }
- client.get_dataset(@project, dataset)
+ with_network_retry { client.get_dataset(@project, dataset) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404
raise NotFoundError, "Dataset #{@project}:#{dataset} is not found"
end
@@ -374,11 +374,11 @@
fields: fields,
}
}
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{@dataset}, #{body}, #{opts})" }
- client.insert_table(@project, @dataset, body, opts)
+ with_network_retry { client.insert_table(@project, @dataset, body, opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
return
end
@@ -392,11 +392,11 @@
end
def delete_table(table)
begin
Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{@dataset}.#{table}" }
- client.delete_table(@project, @dataset, table)
+ with_network_retry { client.delete_table(@project, @dataset, table) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404 && /Not found:/ =~ e.message
# ignore 'Not Found' error
return
end
@@ -410,10 +410,10 @@
end
def get_table(table)
begin
Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{@dataset}.#{table}" }
- client.get_table(@project, @dataset, table)
+ with_network_retry { client.get_table(@project, @dataset, table) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404
raise NotFoundError, "Table #{@project}:#{@dataset}.#{table} is not found"
end