lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.3.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-3.0.0

- old
+ new

@@ -1,9 +1,9 @@ module Fluent module BigQuery class Writer - def initialize(log, auth_method, options = {}) + def initialize(log, auth_method, **options) @auth_method = auth_method @scope = "https://www.googleapis.com/auth/bigquery" @options = options @log = log @num_errors_per_chunk = {} @@ -35,11 +35,11 @@ } definition.merge!(time_partitioning: time_partitioning) if time_partitioning definition.merge!(require_partition_filter: require_partition_filter) if require_partition_filter definition.merge!(clustering: clustering) if clustering - client.insert_table(project, dataset, definition, {}) + client.insert_table(project, dataset, definition, **{}) log.debug "create table", project_id: project, dataset: dataset, table: table_id rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e message = e.message if e.status_code == 409 && /Already Exists:/ =~ message log.debug "already created table", project_id: project, dataset: dataset, table: table_id @@ -81,11 +81,11 @@ body.merge!(template_suffix: template_suffix) if template_suffix if @options[:auto_create_table] res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) else - res = client.insert_all_table_data(project, dataset, table_id, body, {}) + res = client.insert_all_table_data(project, dataset, table_id, body, **{}) end log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size if res.insert_errors && !res.insert_errors.empty? log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s @@ -156,14 +156,12 @@ end res = client.insert_job( project, configuration, - { - upload_source: upload_source, - content_type: "application/octet-stream", - } + upload_source: upload_source, + content_type: "application/octet-stream", ) JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message @@ -341,10 +339,10 @@ end end def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) try_count ||= 1 - res = client.insert_all_table_data(project, dataset, table_id, body, {}) + res = client.insert_all_table_data(project, dataset, table_id, body, **{}) rescue Google::Apis::ClientError => e if e.status_code == 404 && /Not Found: Table/i =~ e.message if try_count == 1 # Table Not Found: Auto Create Table create_table(project, dataset, table_id, schema)