lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.1 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.2
- old
+ new
@@ -1,12 +1,12 @@
module Fluent
module BigQuery
class Writer
- def initialize(log, auth_method, auth_options = {})
+ def initialize(log, auth_method, options = {})
@auth_method = auth_method
@scope = "https://www.googleapis.com/auth/bigquery"
- @auth_options = auth_options
+ @options = options
@log = log
@num_errors_per_chunk = {}
@cached_client_expiration = Time.now + 1800
end
@@ -20,11 +20,11 @@
@cached_client_expiration = Time.now + 1800
@client = client
end
- def create_table(project, dataset, table_id, record_schema, time_partitioning_type: nil, time_partitioning_expiration: nil)
+ def create_table(project, dataset, table_id, record_schema)
create_table_retry_limit = 3
create_table_retry_wait = 1
create_table_retry_count = 0
table_id = safe_table_id(table_id)
@@ -36,14 +36,14 @@
schema: {
fields: record_schema.to_a,
}
}
- if time_partitioning_type
+ if @options[:time_partitioning_type]
definition[:time_partitioning] = {
- type: time_partitioning_type.to_s.upcase,
- expiration_ms: time_partitioning_expiration ? time_partitioning_expiration * 1000 : nil
+ type: @options[:time_partitioning_type].to_s.upcase,
+ expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil
}.compact
end
client.insert_table(project, dataset, definition, {})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
@client = nil
@@ -82,25 +82,25 @@
message = e.message
log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
nil
end
- def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60, allow_retry_insert_errors: false)
+ def insert_rows(project, dataset, table_id, rows, template_suffix: nil)
body = {
rows: rows,
- skip_invalid_rows: skip_invalid_rows,
- ignore_unknown_values: ignore_unknown_values,
+ skip_invalid_rows: @options[:skip_invalid_rows],
+ ignore_unknown_values: @options[:ignore_unknown_values],
}
body.merge!(template_suffix: template_suffix) if template_suffix
res = client.insert_all_table_data(project, dataset, table_id, body, {
- options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec}
+ options: {timeout_sec: @options[:timeout_sec], open_timeout_sec: @options[:open_timeout_sec]}
})
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size
if res.insert_errors && !res.insert_errors.empty?
log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
- if allow_retry_insert_errors
+ if @options[:allow_retry_insert_errors]
is_included_any_retryable_insert_error = res.insert_errors.any? do |insert_error|
insert_error.errors.any? { |error| Fluent::BigQuery::Error.retryable_insert_errors_reason?(error.reason) }
end
if is_included_any_retryable_insert_error
raise Fluent::BigQuery::RetryableError.new("failed to insert into bigquery(insert errors), retry")
@@ -116,11 +116,11 @@
log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
raise Fluent::BigQuery::Error.wrap(e)
end
- def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields, prevent_duplicate_load: false, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil)
+ def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields)
configuration = {
configuration: {
load: {
destination_table: {
project_id: project,
@@ -130,18 +130,18 @@
schema: {
fields: fields.to_a,
},
write_disposition: "WRITE_APPEND",
source_format: "NEWLINE_DELIMITED_JSON",
- ignore_unknown_values: ignore_unknown_values,
- max_bad_records: max_bad_records,
+ ignore_unknown_values: @options[:ignore_unknown_values],
+ max_bad_records: @options[:max_bad_records],
}
}
}
- job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a, max_bad_records, ignore_unknown_values) if prevent_duplicate_load
- configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type
+ job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load]
+ configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type]
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id
# If target table is already exist, omit schema configuration.
# Because schema changing is easier.
begin
@@ -157,12 +157,12 @@
configuration,
{
upload_source: upload_source,
content_type: "application/octet-stream",
options: {
- timeout_sec: timeout_sec,
- open_timeout_sec: open_timeout_sec,
+ timeout_sec: @options[:timeout_sec],
+ open_timeout_sec: @options[:open_timeout_sec],
}
}
)
wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id)
@num_errors_per_chunk.delete(chunk_id)
@@ -170,18 +170,23 @@
@client = nil
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason
- if auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
+ if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message
# Table Not Found: Auto Create Table
- create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration)
+ create_table(
+ project,
+ dataset,
+ table_id,
+ fields,
+ )
raise "table created. send rows next time."
end
if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
- wait_load_job(chunk_id, project, dataset, job_id, table_id)
+ wait_load_job(chunk_id, project, dataset, job_id, table_id)
@num_errors_per_chunk.delete(chunk_id)
return
end
raise Fluent::BigQuery::Error.wrap(e)
@@ -240,13 +245,13 @@
end
end
def get_auth_from_private_key
require 'google/api_client/auth/key_utils'
- private_key_path = @auth_options[:private_key_path]
- private_key_passphrase = @auth_options[:private_key_passphrase]
- email = @auth_options[:email]
+ private_key_path = @options[:private_key_path]
+ private_key_passphrase = @options[:private_key_passphrase]
+ email = @options[:email]
key = Google::APIClient::KeyUtils.load_from_pkcs12(private_key_path, private_key_passphrase)
Signet::OAuth2::Client.new(
token_credential_uri: "https://accounts.google.com/o/oauth2/token",
audience: "https://accounts.google.com/o/oauth2/token",
@@ -259,11 +264,11 @@
def get_auth_from_compute_engine
Google::Auth::GCECredentials.new
end
def get_auth_from_json_key
- json_key = @auth_options[:json_key]
+ json_key = @options[:json_key]
begin
JSON.parse(json_key)
key = StringIO.new(json_key)
Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: @scope)
@@ -281,11 +286,11 @@
def safe_table_id(table_id)
table_id.gsub(/\$\d+$/, "")
end
- def create_job_id(chunk_id, dataset, table, schema, max_bad_records, ignore_unknown_values)
- job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}#{@num_errors_per_chunk[chunk_id]}"
+ def create_job_id(chunk_id, dataset, table, schema)
+ job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id]}"
@log.debug "job_id_key: #{job_id_key}"
"fluentd_job_" + Digest::SHA1.hexdigest(job_id_key)
end
end
end