lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.1 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.2

- old
+ new

@@ -1,12 +1,12 @@ module Fluent module BigQuery class Writer - def initialize(log, auth_method, auth_options = {}) + def initialize(log, auth_method, options = {}) @auth_method = auth_method @scope = "https://www.googleapis.com/auth/bigquery" - @auth_options = auth_options + @options = options @log = log @num_errors_per_chunk = {} @cached_client_expiration = Time.now + 1800 end @@ -20,11 +20,11 @@ @cached_client_expiration = Time.now + 1800 @client = client end - def create_table(project, dataset, table_id, record_schema, time_partitioning_type: nil, time_partitioning_expiration: nil) + def create_table(project, dataset, table_id, record_schema) create_table_retry_limit = 3 create_table_retry_wait = 1 create_table_retry_count = 0 table_id = safe_table_id(table_id) @@ -36,14 +36,14 @@ schema: { fields: record_schema.to_a, } } - if time_partitioning_type + if @options[:time_partitioning_type] definition[:time_partitioning] = { - type: time_partitioning_type.to_s.upcase, - expiration_ms: time_partitioning_expiration ? time_partitioning_expiration * 1000 : nil + type: @options[:time_partitioning_type].to_s.upcase, + expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil }.compact end client.insert_table(project, dataset, definition, {}) log.debug "create table", project_id: project, dataset: dataset, table: table_id @client = nil @@ -82,25 +82,25 @@ message = e.message log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message nil end - def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60, allow_retry_insert_errors: false) + def insert_rows(project, dataset, table_id, rows, template_suffix: nil) body = { rows: rows, - skip_invalid_rows: skip_invalid_rows, - ignore_unknown_values: ignore_unknown_values, + skip_invalid_rows: @options[:skip_invalid_rows], + ignore_unknown_values: @options[:ignore_unknown_values], } body.merge!(template_suffix: template_suffix) if template_suffix res = client.insert_all_table_data(project, dataset, table_id, body, { - options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec} + options: {timeout_sec: @options[:timeout_sec], open_timeout_sec: @options[:open_timeout_sec]} }) log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size if res.insert_errors && !res.insert_errors.empty? log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s - if allow_retry_insert_errors + if @options[:allow_retry_insert_errors] is_included_any_retryable_insert_error = res.insert_errors.any? do |insert_error| insert_error.errors.any? { |error| Fluent::BigQuery::Error.retryable_insert_errors_reason?(error.reason) } end if is_included_any_retryable_insert_error raise Fluent::BigQuery::RetryableError.new("failed to insert into bigquery(insert errors), retry") @@ -116,11 +116,11 @@ log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason raise Fluent::BigQuery::Error.wrap(e) end - def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields, prevent_duplicate_load: false, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil) + def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields) configuration = { configuration: { load: { destination_table: { project_id: project, @@ -130,18 +130,18 @@ schema: { fields: fields.to_a, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON", - ignore_unknown_values: ignore_unknown_values, - max_bad_records: max_bad_records, + ignore_unknown_values: @options[:ignore_unknown_values], + max_bad_records: @options[:max_bad_records], } } } - job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a, max_bad_records, ignore_unknown_values) if prevent_duplicate_load - configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type + job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load] + configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type] configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id # If target table is already exist, omit schema configuration. # Because schema changing is easier. begin @@ -157,12 +157,12 @@ configuration, { upload_source: upload_source, content_type: "application/octet-stream", options: { - timeout_sec: timeout_sec, - open_timeout_sec: open_timeout_sec, + timeout_sec: @options[:timeout_sec], + open_timeout_sec: @options[:open_timeout_sec], } } ) wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id) @num_errors_per_chunk.delete(chunk_id) @@ -170,18 +170,23 @@ @client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason - if auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message + if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message # Table Not Found: Auto Create Table - create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration) + create_table( + project, + dataset, + table_id, + fields, + ) raise "table created. send rows next time." end if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job - wait_load_job(chunk_id, project, dataset, job_id, table_id) + wait_load_job(chunk_id, project, dataset, job_id, table_id) @num_errors_per_chunk.delete(chunk_id) return end raise Fluent::BigQuery::Error.wrap(e) @@ -240,13 +245,13 @@ end end def get_auth_from_private_key require 'google/api_client/auth/key_utils' - private_key_path = @auth_options[:private_key_path] - private_key_passphrase = @auth_options[:private_key_passphrase] - email = @auth_options[:email] + private_key_path = @options[:private_key_path] + private_key_passphrase = @options[:private_key_passphrase] + email = @options[:email] key = Google::APIClient::KeyUtils.load_from_pkcs12(private_key_path, private_key_passphrase) Signet::OAuth2::Client.new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", @@ -259,11 +264,11 @@ def get_auth_from_compute_engine Google::Auth::GCECredentials.new end def get_auth_from_json_key - json_key = @auth_options[:json_key] + json_key = @options[:json_key] begin JSON.parse(json_key) key = StringIO.new(json_key) Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: @scope) @@ -281,11 +286,11 @@ def safe_table_id(table_id) table_id.gsub(/\$\d+$/, "") end - def create_job_id(chunk_id, dataset, table, schema, max_bad_records, ignore_unknown_values) - job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}#{@num_errors_per_chunk[chunk_id]}" + def create_job_id(chunk_id, dataset, table, schema) + job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id]}" @log.debug "job_id_key: #{job_id_key}" "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key) end end end