lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-1.2.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.0.0.beta
- old
+ new
@@ -5,26 +5,19 @@
@auth_method = auth_method
@scope = "https://www.googleapis.com/auth/bigquery"
@options = options
@log = log
@num_errors_per_chunk = {}
-
- @cached_client_expiration = Time.now + 1800
end
def client
- return @client if @client && @cached_client_expiration > Time.now
-
- client = Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl|
+ @client ||= Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl|
cl.authorization = get_auth
cl.client_options.open_timeout_sec = @options[:open_timeout_sec] if @options[:open_timeout_sec]
cl.client_options.read_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec]
cl.client_options.send_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec]
end
-
- @cached_client_expiration = Time.now + 1800
- @client = client
end
def create_table(project, dataset, table_id, record_schema)
create_table_retry_limit = 3
create_table_retry_wait = 1
@@ -47,14 +40,11 @@
expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil
}.select { |_, value| !value.nil? }
end
client.insert_table(project, dataset, definition, {})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
- @client = nil
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
- @client = nil
-
message = e.message
if e.status_code == 409 && /Already Exists:/ =~ message
log.debug "already created table", project_id: project, dataset: dataset, table: table_id
# ignore 'Already Exists' error
return
@@ -79,11 +69,10 @@
schema = Fluent::BigQuery::Helper.deep_stringify_keys(res.schema.to_h[:fields])
log.debug "Load schema from BigQuery: #{project}:#{dataset}.#{table_id} #{schema}"
schema
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
- @client = nil
message = e.message
log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
nil
end
@@ -109,12 +98,10 @@
raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry")
end
end
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
- @client = nil
-
reason = e.respond_to?(:reason) ? e.reason : nil
error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason }
wrapped = Fluent::BigQuery::Error.wrap(e)
if wrapped.retryable?
log.warn "tabledata.insertAll API", error_data
@@ -123,11 +110,21 @@
end
raise wrapped
end
- def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields)
+ JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do
+ def as_hash(*keys)
+ if keys.empty?
+ to_h
+ else
+ to_h.select { |k, _| keys.include?(k) }
+ end
+ end
+ end
+
+ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_source, fields)
configuration = {
configuration: {
load: {
destination_table: {
project_id: project,
@@ -143,11 +140,11 @@
max_bad_records: @options[:max_bad_records],
}
}
}
- job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load]
+ job_id = create_job_id(chunk_id_hex, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load]
configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type]
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id
# If target table is already exist, omit schema configuration.
# Because schema changing is easier.
@@ -165,15 +162,12 @@
{
upload_source: upload_source,
content_type: "application/octet-stream",
}
)
- wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id)
- @num_errors_per_chunk.delete(chunk_id)
+ JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
- @client = nil
-
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message
# Table Not Found: Auto Create Table
@@ -185,48 +179,60 @@
)
raise "table created. send rows next time."
end
if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
- wait_load_job(chunk_id, project, dataset, job_id, table_id)
- @num_errors_per_chunk.delete(chunk_id)
- return
+ return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id)
end
raise Fluent::BigQuery::Error.wrap(e)
end
- def wait_load_job(chunk_id, project, dataset, job_id, table_id)
- wait_interval = 10
- _response = client.get_job(project, job_id)
+ def fetch_load_job(job_reference)
+ project = job_reference.project_id
+ job_id = job_reference.job_id
- until _response.status.state == "DONE"
- log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id
- sleep wait_interval
- _response = client.get_job(project, _response.job_reference.job_id)
+ res = client.get_job(project, job_id)
+ log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id)
+
+ if res.status.state == "DONE"
+ res
end
+ rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
+ e = Fluent::BigQuery::Error.wrap(e)
+ raise e unless e.retryable?
+ end
- errors = _response.status.errors
+ def commit_load_job(chunk_id_hex, response)
+ job_id = response.id
+ project = response.configuration.load.destination_table.project_id
+ dataset = response.configuration.load.destination_table.dataset_id
+ table_id = response.configuration.load.destination_table.table_id
+
+ errors = response.status.errors
if errors
errors.each do |e|
- log.error "job.insert API (rows)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: e.message, reason: e.reason
+ log.error "job.load API (rows)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: e.message, reason: e.reason
end
end
- error_result = _response.status.error_result
+ error_result = response.status.error_result
if error_result
- log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason
+ log.error "job.load API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason
if Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason)
- @num_errors_per_chunk[chunk_id] = @num_errors_per_chunk[chunk_id].to_i + 1
+ @num_errors_per_chunk[chunk_id_hex] = @num_errors_per_chunk[chunk_id_hex].to_i + 1
raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry")
else
- @num_errors_per_chunk.delete(chunk_id)
+ @num_errors_per_chunk.delete(chunk_id_hex)
raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry")
end
end
- log.debug "finish load job", state: _response.status.state
+ stats = response.statistics.load
+ duration = (response.statistics.end_time - response.statistics.creation_time) / 1000.0
+ log.debug "load job finished", id: job_id, state: response.status.state, input_file_bytes: stats.input_file_bytes, input_files: stats.input_files, output_bytes: stats.output_bytes, output_rows: stats.output_rows, bad_records: stats.bad_records, duration: duration.round(2), project_id: project, dataset: dataset, table: table_id
+ @num_errors_per_chunk.delete(chunk_id_hex)
end
private
def log
@@ -289,11 +295,11 @@
def safe_table_id(table_id)
table_id.gsub(/\$\d+$/, "")
end
- def create_job_id(chunk_id, dataset, table, schema)
- job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id]}"
+ def create_job_id(chunk_id_hex, dataset, table, schema)
+ job_id_key = "#{chunk_id_hex}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id_hex]}"
@log.debug "job_id_key: #{job_id_key}"
"fluentd_job_" + Digest::SHA1.hexdigest(job_id_key)
end
def source_format