lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.2 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.3

- old
+ new

@@ -1,47 +1,8 @@ module Fluent module BigQuery class Writer - RETRYABLE_ERROR_REASON = %w(backendError internalError rateLimitExceeded tableUnavailable).freeze - - class Error < StandardError - attr_reader :origin - - def initialize(message, origin = nil) - @origin = origin - super(message || origin.message) - end - - def method_missing(name, *args) - if @origin - @origin.send(name, *args) - else - super - end - end - - def reason - @origin && @origin.respond_to?(:reason) ? @origin.reason : nil - end - - def status_code - @origin && @origin.respond_to?(:status_code) ? @origin.status_code : nil - end - - def retryable? - false - end - end - - class UnRetryableError < Error; end - - class RetryableError < Error - def retryable? - true - end - end - def initialize(log, auth_method, auth_options = {}) @auth_method = auth_method @scope = "https://www.googleapis.com/auth/bigquery" @auth_options = auth_options @log = log @@ -95,17 +56,17 @@ end reason = e.respond_to?(:reason) ? e.reason : nil log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message, reason: reason - if RETRYABLE_ERROR_REASON.include?(reason) && create_table_retry_count < create_table_retry_limit + if Fluent::BigQuery::Error.retryable_error_reason?(reason) && create_table_retry_count < create_table_retry_limit sleep create_table_retry_wait create_table_retry_wait *= 2 create_table_retry_count += 1 retry else - raise UnRetryableError.new("failed to create table in bigquery", e) + raise Fluent::BigQuery::UnRetryableError.new("failed to create table in bigquery", e) end end end def fetch_schema(project, dataset, table_id) @@ -137,15 +98,11 @@ @client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason - if RETRYABLE_ERROR_REASON.include?(reason) - raise RetryableError.new(nil, e) - else - raise UnRetryableError.new(nil, e) - end + raise Fluent::BigQuery::Error.wrap(e) end def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil) configuration = { configuration: { @@ -173,11 +130,11 @@ begin if client.get_table(project, dataset, table_id) configuration[:configuration][:load].delete(:schema) end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError - raise UnRetryableError.new("Schema is empty") if fields.empty? + raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty? end res = client.insert_job( project, configuration, @@ -203,15 +160,11 @@ raise "table created. send rows next time." end return wait_load_job(project, dataset, job_id, table_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job - if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError) - raise RetryableError.new(nil, e) - else - raise UnRetryableError.new(nil, e) - end + raise Fluent::BigQuery::Error.wrap(e) end def wait_load_job(project, dataset, job_id, table_id, retryable: true) wait_interval = 10 _response = client.get_job(project, job_id) @@ -230,13 +183,13 @@ end error_result = _response.status.error_result if error_result log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason - if retryable && RETRYABLE_ERROR_REASON.include?(error_result.reason) - raise RetryableError.new("failed to load into bigquery, retry") + if retryable && Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason) + raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry") else - raise UnRetryableError.new("failed to load into bigquery, and cannot retry") + raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry") end end log.debug "finish load job", state: _response.status.state end