lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.3.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-3.0.0
- old
+ new
@@ -1,9 +1,9 @@
module Fluent
module BigQuery
class Writer
- def initialize(log, auth_method, options = {})
+ def initialize(log, auth_method, **options)
@auth_method = auth_method
@scope = "https://www.googleapis.com/auth/bigquery"
@options = options
@log = log
@num_errors_per_chunk = {}
@@ -35,11 +35,11 @@
}
definition.merge!(time_partitioning: time_partitioning) if time_partitioning
definition.merge!(require_partition_filter: require_partition_filter) if require_partition_filter
definition.merge!(clustering: clustering) if clustering
- client.insert_table(project, dataset, definition, {})
+ client.insert_table(project, dataset, definition, **{})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
message = e.message
if e.status_code == 409 && /Already Exists:/ =~ message
log.debug "already created table", project_id: project, dataset: dataset, table: table_id
@@ -81,11 +81,11 @@
body.merge!(template_suffix: template_suffix) if template_suffix
if @options[:auto_create_table]
res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
else
- res = client.insert_all_table_data(project, dataset, table_id, body, {})
+ res = client.insert_all_table_data(project, dataset, table_id, body, **{})
end
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size
if res.insert_errors && !res.insert_errors.empty?
log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
@@ -156,14 +156,12 @@
end
res = client.insert_job(
project,
configuration,
- {
- upload_source: upload_source,
- content_type: "application/octet-stream",
- }
+ upload_source: upload_source,
+ content_type: "application/octet-stream",
)
JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message
@@ -341,10 +339,10 @@
end
end
def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
try_count ||= 1
- res = client.insert_all_table_data(project, dataset, table_id, body, {})
+ res = client.insert_all_table_data(project, dataset, table_id, body, **{})
rescue Google::Apis::ClientError => e
if e.status_code == 404 && /Not Found: Table/i =~ e.message
if try_count == 1
# Table Not Found: Auto Create Table
create_table(project, dataset, table_id, schema)