lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.2 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.3
- old
+ new
@@ -1,47 +1,8 @@
module Fluent
module BigQuery
class Writer
- RETRYABLE_ERROR_REASON = %w(backendError internalError rateLimitExceeded tableUnavailable).freeze
-
- class Error < StandardError
- attr_reader :origin
-
- def initialize(message, origin = nil)
- @origin = origin
- super(message || origin.message)
- end
-
- def method_missing(name, *args)
- if @origin
- @origin.send(name, *args)
- else
- super
- end
- end
-
- def reason
- @origin && @origin.respond_to?(:reason) ? @origin.reason : nil
- end
-
- def status_code
- @origin && @origin.respond_to?(:status_code) ? @origin.status_code : nil
- end
-
- def retryable?
- false
- end
- end
-
- class UnRetryableError < Error; end
-
- class RetryableError < Error
- def retryable?
- true
- end
- end
-
def initialize(log, auth_method, auth_options = {})
@auth_method = auth_method
@scope = "https://www.googleapis.com/auth/bigquery"
@auth_options = auth_options
@log = log
@@ -95,17 +56,17 @@
end
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message, reason: reason
- if RETRYABLE_ERROR_REASON.include?(reason) && create_table_retry_count < create_table_retry_limit
+ if Fluent::BigQuery::Error.retryable_error_reason?(reason) && create_table_retry_count < create_table_retry_limit
sleep create_table_retry_wait
create_table_retry_wait *= 2
create_table_retry_count += 1
retry
else
- raise UnRetryableError.new("failed to create table in bigquery", e)
+ raise Fluent::BigQuery::UnRetryableError.new("failed to create table in bigquery", e)
end
end
end
def fetch_schema(project, dataset, table_id)
@@ -137,15 +98,11 @@
@client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
- if RETRYABLE_ERROR_REASON.include?(reason)
- raise RetryableError.new(nil, e)
- else
- raise UnRetryableError.new(nil, e)
- end
+ raise Fluent::BigQuery::Error.wrap(e)
end
def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil)
configuration = {
configuration: {
@@ -173,11 +130,11 @@
begin
if client.get_table(project, dataset, table_id)
configuration[:configuration][:load].delete(:schema)
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError
- raise UnRetryableError.new("Schema is empty") if fields.empty?
+ raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
end
res = client.insert_job(
project,
configuration,
@@ -203,15 +160,11 @@
raise "table created. send rows next time."
end
return wait_load_job(project, dataset, job_id, table_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
- if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
- raise RetryableError.new(nil, e)
- else
- raise UnRetryableError.new(nil, e)
- end
+ raise Fluent::BigQuery::Error.wrap(e)
end
def wait_load_job(project, dataset, job_id, table_id, retryable: true)
wait_interval = 10
_response = client.get_job(project, job_id)
@@ -230,13 +183,13 @@
end
error_result = _response.status.error_result
if error_result
log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason
- if retryable && RETRYABLE_ERROR_REASON.include?(error_result.reason)
- raise RetryableError.new("failed to load into bigquery, retry")
+ if retryable && Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason)
+ raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry")
else
- raise UnRetryableError.new("failed to load into bigquery, and cannot retry")
+ raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry")
end
end
log.debug "finish load job", state: _response.status.state
end