lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.0.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.1.0

- old
+ new

@@ -32,31 +32,24 @@ schema: { fields: record_schema.to_a, } } - if @options[:time_partitioning_type] - definition[:time_partitioning] = { - type: @options[:time_partitioning_type].to_s.upcase, - field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil, - expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil - }.select { |_, value| !value.nil? } - end + definition.merge!(time_partitioning: time_partitioning) if time_partitioning client.insert_table(project, dataset, definition, {}) log.debug "create table", project_id: project, dataset: dataset, table: table_id rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e message = e.message if e.status_code == 409 && /Already Exists:/ =~ message log.debug "already created table", project_id: project, dataset: dataset, table: table_id # ignore 'Already Exists' error return end - reason = e.respond_to?(:reason) ? e.reason : nil - log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message, reason: reason + log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message - if Fluent::BigQuery::Error.retryable_error_reason?(reason) && create_table_retry_count < create_table_retry_limit + if create_table_retry_count < create_table_retry_limit sleep create_table_retry_wait create_table_retry_wait *= 2 create_table_retry_count += 1 retry else @@ -75,18 +68,23 @@ message = e.message log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message nil end - def insert_rows(project, dataset, table_id, rows, template_suffix: nil) + def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil) body = { rows: rows, skip_invalid_rows: @options[:skip_invalid_rows], ignore_unknown_values: @options[:ignore_unknown_values], } body.merge!(template_suffix: template_suffix) if template_suffix - res = client.insert_all_table_data(project, dataset, table_id, body, {}) + + if @options[:auto_create_table] + res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) + else + res = client.insert_all_table_data(project, dataset, table_id, body, {}) + end log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size if res.insert_errors && !res.insert_errors.empty? log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s if @options[:allow_retry_insert_errors] @@ -99,12 +97,11 @@ raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry") end end end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - reason = e.respond_to?(:reason) ? e.reason : nil - error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason } + error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message } wrapped = Fluent::BigQuery::Error.wrap(e) if wrapped.retryable? log.warn "tabledata.insertAll API", error_data else log.error "tabledata.insertAll API", error_data @@ -130,33 +127,31 @@ destination_table: { project_id: project, dataset_id: dataset, table_id: table_id, }, - schema: { - fields: fields.to_a, - }, write_disposition: "WRITE_APPEND", source_format: source_format, ignore_unknown_values: @options[:ignore_unknown_values], max_bad_records: @options[:max_bad_records], } } } job_id = create_job_id(chunk_id_hex, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load] - configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type] configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id - # If target table is already exist, omit schema configuration. - # Because schema changing is easier. begin - if client.get_table(project, dataset, table_id) - configuration[:configuration][:load].delete(:schema) + # Check table existance + client.get_table(project, dataset, table_id) + rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + if e.status_code == 404 && /Not Found: Table/i =~ e.message + raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table] + raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty? + configuration[:configuration][:load].merge!(schema: {fields: fields.to_a}) + configuration[:configuration][:load].merge!(time_partitioning: time_partitioning) if time_partitioning end - rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError - raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty? end res = client.insert_job( project, configuration, @@ -165,24 +160,12 @@ content_type: "application/octet-stream", } ) JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - reason = e.respond_to?(:reason) ? e.reason : nil - log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason + log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message - if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message - # Table Not Found: Auto Create Table - create_table( - project, - dataset, - table_id, - fields, - ) - raise "table created. send rows next time." - end - if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id) end raise Fluent::BigQuery::Error.wrap(e) @@ -314,9 +297,46 @@ when :csv "CSV" else "NEWLINE_DELIMITED_JSON" end + end + + def time_partitioning + return @time_partitioning if instance_variable_defined?(:@time_partitioning) + + if @options[:time_partitioning_type] + @time_partitioning = { + type: @options[:time_partitioning_type].to_s.upcase, + field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil, + expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil, + require_partition_filter: @options[:time_partitioning_require_partition_filter], + }.reject { |_, v| v.nil? } + else + @time_partitioning + end + end + + def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) + try_count ||= 1 + res = client.insert_all_table_data(project, dataset, table_id, body, {}) + rescue Google::Apis::ClientError => e + if e.status_code == 404 && /Not Found: Table/i =~ e.message + if try_count == 1 + # Table Not Found: Auto Create Table + create_table(project, dataset, table_id, schema) + elsif try_count > 10 + raise "A new table was created but it is not found." + end + + # Retry to insert several times because the created table is not visible from Streaming insert for a little while + # cf. https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts + try_count += 1 + sleep 5 + log.debug "Retry to insert rows", project_id: project, dataset: dataset, table: table_id + retry + end + raise end end end end