lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-0.3.1

- old
+ new

@@ -58,24 +58,32 @@ @cached_client_expiration = Time.now + 1800 @client = client end - def create_table(project, dataset, table_id, record_schema) + def create_table(project, dataset, table_id, record_schema, time_partitioning_type: nil, time_partitioning_expiration: nil) create_table_retry_limit = 3 create_table_retry_wait = 1 create_table_retry_count = 0 begin - client.insert_table(project, dataset, { + definition = { table_reference: { table_id: table_id, }, schema: { fields: record_schema.to_a, } - }, {}) + } + + if time_partitioning_type + definition[:time_partitioning] = { + type: time_partitioning_type.to_s.upcase, + expiration_ms: time_partitioning_expiration ? time_partitioning_expiration * 1000 : nil + }.compact + end + client.insert_table(project, dataset, definition, {}) log.debug "create table", project_id: project, dataset: dataset, table: table_id @client = nil rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @client = nil @@ -122,11 +130,11 @@ body.merge!(template_suffix: template_suffix) if template_suffix res = client.insert_all_table_data(project, dataset, table_id, body, { options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec} }) log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size - log.warn "insert errors", insert_errors: res.insert_errors if res.insert_errors && !res.insert_errors.empty? + log.warn "insert errors", insert_errors: res.insert_errors.to_s if res.insert_errors && !res.insert_errors.empty? rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason @@ -136,11 +144,11 @@ else raise UnRetryableError.new(nil, e) end end - def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60) + def create_load_job(project, dataset, table_id, upload_source, job_id, fields, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil) configuration = { configuration: { load: { destination_table: { project_id: project, @@ -155,10 +163,11 @@ ignore_unknown_values: ignore_unknown_values, max_bad_records: max_bad_records, } } } + configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id # If target table is already exist, omit schema configuration. # Because schema changing is easier. begin @@ -186,10 +195,16 @@ @client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason - return wait_load_job(project, dataset, job_id, table_id, retryable: false) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job + if auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message + # Table Not Found: Auto Create Table + create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration) + raise "table created. send rows next time." + end + + return wait_load_job(project, dataset, job_id, table_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError) raise RetryableError.new(nil, e) else raise UnRetryableError.new(nil, e)