lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-1.0.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-1.1.0
- old
+ new
@@ -14,10 +14,13 @@
def client
return @client if @client && @cached_client_expiration > Time.now
client = Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl|
cl.authorization = get_auth
+ cl.client_options.open_timeout_sec = @options[:open_timeout_sec] if @options[:open_timeout_sec]
+ cl.client_options.read_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec]
+ cl.client_options.send_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec]
end
@cached_client_expiration = Time.now + 1800
@client = client
end
@@ -89,13 +92,11 @@
rows: rows,
skip_invalid_rows: @options[:skip_invalid_rows],
ignore_unknown_values: @options[:ignore_unknown_values],
}
body.merge!(template_suffix: template_suffix) if template_suffix
- res = client.insert_all_table_data(project, dataset, table_id, body, {
- options: {timeout_sec: @options[:timeout_sec], open_timeout_sec: @options[:open_timeout_sec]}
- })
+ res = client.insert_all_table_data(project, dataset, table_id, body, {})
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size
if res.insert_errors && !res.insert_errors.empty?
log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
if @options[:allow_retry_insert_errors]
@@ -135,11 +136,11 @@
},
schema: {
fields: fields.to_a,
},
write_disposition: "WRITE_APPEND",
- source_format: "NEWLINE_DELIMITED_JSON",
+ source_format: source_format,
ignore_unknown_values: @options[:ignore_unknown_values],
max_bad_records: @options[:max_bad_records],
}
}
}
@@ -162,14 +163,10 @@
project,
configuration,
{
upload_source: upload_source,
content_type: "application/octet-stream",
- options: {
- timeout_sec: @options[:timeout_sec],
- open_timeout_sec: @options[:open_timeout_sec],
- }
}
)
wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id)
@num_errors_per_chunk.delete(chunk_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
@@ -296,9 +293,22 @@
def create_job_id(chunk_id, dataset, table, schema)
job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id]}"
@log.debug "job_id_key: #{job_id_key}"
"fluentd_job_" + Digest::SHA1.hexdigest(job_id_key)
+ end
+
+ def source_format
+ case @options[:source_format]
+ when :json
+ "NEWLINE_DELIMITED_JSON"
+ when :avro
+ "AVRO"
+ when :csv
+ "CSV"
+ else
+ "NEWLINE_DELIMITED_JSON"
+ end
end
end
end
end