lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.6 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.7

- old
+ new

@@ -38,11 +38,11 @@ def reset_fields(fields = nil) @fields = fields self.fields end - def with_retry_job(&block) + def with_job_retry(&block) retries = 0 begin yield rescue BackendError, InternalError => e if retries < @task['retries'] @@ -57,11 +57,11 @@ end # @params gcs_patsh [Array] arary of gcs paths such as gs://bucket/path # @return [Array] responses def load_from_gcs(object_uris, table) - with_retry_job do + with_job_retry do begin # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says, # we should generate job_id in client code, otherwise, retrying would cause duplication if @task['prevent_duplicate_insert'] and (@task['mode'] == 'append' or @task['mode'] == 'append_direct') job_id = Helper.create_load_job_id(@task, path, fields) @@ -97,11 +97,11 @@ } } opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = client.insert_job(@project, body, opts) + response = with_network_retry { client.insert_job(@project, body, opts) } unless @task['is_skip_job_result_check'] response = wait_load('Load', response) end [response] rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @@ -126,27 +126,27 @@ # So, we dropped it. See https://github.com/embulk/embulk-output-bigquery/pull/35 responses = [] threads = [] Embulk.logger.debug { "embulk-output-bigquery: LOAD IN PARALLEL #{paths}" } paths.each_with_index do |path, idx| - threads << Thread.new do + threads << Thread.new(path, idx) do |path, idx| # I am not sure whether google-api-ruby-client is thread-safe, # so let me create new instances for each thread for safe bigquery = self.class.new(@task, @schema, fields) response = bigquery.load(path, table) [idx, response] end end ThreadsWait.all_waits(*threads) do |th| idx, response = th.value # raise errors occurred in threads - responses[idx] = response if idx + responses[idx] = response end responses end def load(path, table) - with_retry_job do + with_job_retry do begin if File.exist?(path) # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says, # we should generate job_id in client code, otherwise, retrying would cause duplication if @task['prevent_duplicate_insert'] and (@task['mode'] == 'append' or @task['mode'] == 'append_direct') @@ -194,11 +194,11 @@ # timeout_sec: @task['timeout_sec'], # open_timeout_sec: @task['open_timeout_sec'] # }, } Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = client.insert_job(@project, body, opts) + response = with_network_retry { client.insert_job(@project, body, opts) } if @task['is_skip_job_result_check'] response else response = wait_load('Load', response) end @@ -211,11 +211,11 @@ end end end def copy(source_table, destination_table, destination_dataset = nil, write_disposition: 'WRITE_TRUNCATE') - with_retry_job do + with_job_retry do begin destination_dataset ||= @dataset job_id = "embulk_copy_job_#{SecureRandom.uuid}" Embulk.logger.info { @@ -246,11 +246,11 @@ } } opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = client.insert_job(@project, body, opts) + response = with_network_retry { client.insert_job(@project, body, opts) } wait_load('Copy', response) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}" @@ -287,11 +287,11 @@ Embulk.logger.info { "embulk-output-bigquery: #{kind} job checking... " \ "job_id:[#{job_id}] elapsed_time:#{elapsed.to_f}sec status:[#{status}]" } sleep wait_interval - _response = client.get_job(@project, job_id) + _response = with_network_retry { client.get_job(@project, job_id) } end end # cf. http://www.rubydoc.info/github/google/google-api-ruby-client/Google/Apis/BigqueryV2/JobStatus#errors-instance_method # `errors` returns Array<Google::Apis::BigqueryV2::ErrorProto> if any error exists. @@ -328,11 +328,11 @@ dataset_id: dataset, }, }.merge(hint) opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{body}, #{opts})" } - client.insert_dataset(@project, body, opts) + with_network_retry { client.insert_dataset(@project, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error return end @@ -347,11 +347,11 @@ def get_dataset(dataset = nil) dataset ||= @dataset begin Embulk.logger.info { "embulk-output-bigquery: Get dataset... #{@project}:#{@dataset}" } - client.get_dataset(@project, dataset) + with_network_retry { client.get_dataset(@project, dataset) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 raise NotFoundError, "Dataset #{@project}:#{dataset} is not found" end @@ -374,11 +374,11 @@ fields: fields, } } opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{@dataset}, #{body}, #{opts})" } - client.insert_table(@project, @dataset, body, opts) + with_network_retry { client.insert_table(@project, @dataset, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error return end @@ -392,11 +392,11 @@ end def delete_table(table) begin Embulk.logger.info { "embulk-output-bigquery: Delete table... #{@project}:#{@dataset}.#{table}" } - client.delete_table(@project, @dataset, table) + with_network_retry { client.delete_table(@project, @dataset, table) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 && /Not found:/ =~ e.message # ignore 'Not Found' error return end @@ -410,10 +410,10 @@ end def get_table(table) begin Embulk.logger.info { "embulk-output-bigquery: Get table... #{@project}:#{@dataset}.#{table}" } - client.get_table(@project, @dataset, table) + with_network_retry { client.get_table(@project, @dataset, table) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 raise NotFoundError, "Table #{@project}:#{@dataset}.#{table} is not found" end