lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-1.0.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-1.1.0

- old
+ new

@@ -14,10 +14,13 @@ def client return @client if @client && @cached_client_expiration > Time.now client = Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl| cl.authorization = get_auth + cl.client_options.open_timeout_sec = @options[:open_timeout_sec] if @options[:open_timeout_sec] + cl.client_options.read_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec] + cl.client_options.send_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec] end @cached_client_expiration = Time.now + 1800 @client = client end @@ -89,13 +92,11 @@ rows: rows, skip_invalid_rows: @options[:skip_invalid_rows], ignore_unknown_values: @options[:ignore_unknown_values], } body.merge!(template_suffix: template_suffix) if template_suffix - res = client.insert_all_table_data(project, dataset, table_id, body, { - options: {timeout_sec: @options[:timeout_sec], open_timeout_sec: @options[:open_timeout_sec]} - }) + res = client.insert_all_table_data(project, dataset, table_id, body, {}) log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size if res.insert_errors && !res.insert_errors.empty? log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s if @options[:allow_retry_insert_errors] @@ -135,11 +136,11 @@ }, schema: { fields: fields.to_a, }, write_disposition: "WRITE_APPEND", - source_format: "NEWLINE_DELIMITED_JSON", + source_format: source_format, ignore_unknown_values: @options[:ignore_unknown_values], max_bad_records: @options[:max_bad_records], } } } @@ -162,14 +163,10 @@ project, configuration, { upload_source: upload_source, content_type: "application/octet-stream", - options: { - timeout_sec: @options[:timeout_sec], - open_timeout_sec: @options[:open_timeout_sec], - } } ) wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id) @num_errors_per_chunk.delete(chunk_id) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @@ -296,9 +293,22 @@ def create_job_id(chunk_id, dataset, table, schema) job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id]}" @log.debug "job_id_key: #{job_id_key}" "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key) + end + + def source_format + case @options[:source_format] + when :json + "NEWLINE_DELIMITED_JSON" + when :avro + "AVRO" + when :csv + "CSV" + else + "NEWLINE_DELIMITED_JSON" + end end end end end