lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.4 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.4.0

- old
+ new

@@ -4,10 +4,11 @@ def initialize(log, auth_method, auth_options = {}) @auth_method = auth_method @scope = "https://www.googleapis.com/auth/bigquery" @auth_options = auth_options @log = log + @num_errors_per_chunk = {} @cached_client_expiration = Time.now + 1800 end def client @@ -102,11 +103,11 @@ log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason raise Fluent::BigQuery::Error.wrap(e) end - def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil) + def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields, prevent_duplicate_load: false, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil) configuration = { configuration: { load: { destination_table: { project_id: project, @@ -121,10 +122,12 @@ ignore_unknown_values: ignore_unknown_values, max_bad_records: max_bad_records, } } } + + job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a, max_bad_records, ignore_unknown_values) if prevent_duplicate_load configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id # If target table is already exist, omit schema configuration. # Because schema changing is easier. @@ -146,11 +149,12 @@ timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec, } } ) - wait_load_job(project, dataset, res.job_reference.job_id, table_id) + wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id) + @num_errors_per_chunk.delete(chunk_id) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason @@ -159,16 +163,20 @@ # Table Not Found: Auto Create Table create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration) raise "table created. send rows next time." end - return wait_load_job(project, dataset, job_id, table_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job + if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job + wait_load_job(chunk_id, project, dataset, job_id, table_id) + @num_errors_per_chunk.delete(chunk_id) + return + end raise Fluent::BigQuery::Error.wrap(e) end - def wait_load_job(project, dataset, job_id, table_id, retryable: true) + def wait_load_job(chunk_id, project, dataset, job_id, table_id) wait_interval = 10 _response = client.get_job(project, job_id) until _response.status.state == "DONE" log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id @@ -184,13 +192,15 @@ end error_result = _response.status.error_result if error_result log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason - if retryable && Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason) + if Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason) + @num_errors_per_chunk[chunk_id] = @num_errors_per_chunk[chunk_id].to_i + 1 raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry") else + @num_errors_per_chunk.delete(chunk_id) raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry") end end log.debug "finish load job", state: _response.status.state @@ -256,9 +266,15 @@ Google::Auth.get_application_default([@scope]) end def safe_table_id(table_id) table_id.gsub(/\$\d+$/, "") + end + + def create_job_id(chunk_id, dataset, table, schema, max_bad_records, ignore_unknown_values) + job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}#{@num_errors_per_chunk[chunk_id]}" + @log.debug "job_id_key: #{job_id_key}" + "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key) end end end end