lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-1.2.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.0.0.beta

- old
+ new

@@ -5,26 +5,19 @@ @auth_method = auth_method @scope = "https://www.googleapis.com/auth/bigquery" @options = options @log = log @num_errors_per_chunk = {} - - @cached_client_expiration = Time.now + 1800 end def client - return @client if @client && @cached_client_expiration > Time.now - - client = Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl| + @client ||= Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl| cl.authorization = get_auth cl.client_options.open_timeout_sec = @options[:open_timeout_sec] if @options[:open_timeout_sec] cl.client_options.read_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec] cl.client_options.send_timeout_sec = @options[:timeout_sec] if @options[:timeout_sec] end - - @cached_client_expiration = Time.now + 1800 - @client = client end def create_table(project, dataset, table_id, record_schema) create_table_retry_limit = 3 create_table_retry_wait = 1 @@ -47,14 +40,11 @@ expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil }.select { |_, value| !value.nil? } end client.insert_table(project, dataset, definition, {}) log.debug "create table", project_id: project, dataset: dataset, table: table_id - @client = nil rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - @client = nil - message = e.message if e.status_code == 409 && /Already Exists:/ =~ message log.debug "already created table", project_id: project, dataset: dataset, table: table_id # ignore 'Already Exists' error return @@ -79,11 +69,10 @@ schema = Fluent::BigQuery::Helper.deep_stringify_keys(res.schema.to_h[:fields]) log.debug "Load schema from BigQuery: #{project}:#{dataset}.#{table_id} #{schema}" schema rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - @client = nil message = e.message log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message nil end @@ -109,12 +98,10 @@ raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry") end end end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - @client = nil - reason = e.respond_to?(:reason) ? e.reason : nil error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason } wrapped = Fluent::BigQuery::Error.wrap(e) if wrapped.retryable? log.warn "tabledata.insertAll API", error_data @@ -123,11 +110,21 @@ end raise wrapped end - def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields) + JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do + def as_hash(*keys) + if keys.empty? + to_h + else + to_h.select { |k, _| keys.include?(k) } + end + end + end + + def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_source, fields) configuration = { configuration: { load: { destination_table: { project_id: project, @@ -143,11 +140,11 @@ max_bad_records: @options[:max_bad_records], } } } - job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load] + job_id = create_job_id(chunk_id_hex, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load] configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type] configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id # If target table is already exist, omit schema configuration. # Because schema changing is easier. @@ -165,15 +162,12 @@ { upload_source: upload_source, content_type: "application/octet-stream", } ) - wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id) - @num_errors_per_chunk.delete(chunk_id) + JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e - @client = nil - reason = e.respond_to?(:reason) ? e.reason : nil log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message # Table Not Found: Auto Create Table @@ -185,48 +179,60 @@ ) raise "table created. send rows next time." end if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job - wait_load_job(chunk_id, project, dataset, job_id, table_id) - @num_errors_per_chunk.delete(chunk_id) - return + return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id) end raise Fluent::BigQuery::Error.wrap(e) end - def wait_load_job(chunk_id, project, dataset, job_id, table_id) - wait_interval = 10 - _response = client.get_job(project, job_id) + def fetch_load_job(job_reference) + project = job_reference.project_id + job_id = job_reference.job_id - until _response.status.state == "DONE" - log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id - sleep wait_interval - _response = client.get_job(project, _response.job_reference.job_id) + res = client.get_job(project, job_id) + log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id) + + if res.status.state == "DONE" + res end + rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + e = Fluent::BigQuery::Error.wrap(e) + raise e unless e.retryable? + end - errors = _response.status.errors + def commit_load_job(chunk_id_hex, response) + job_id = response.id + project = response.configuration.load.destination_table.project_id + dataset = response.configuration.load.destination_table.dataset_id + table_id = response.configuration.load.destination_table.table_id + + errors = response.status.errors if errors errors.each do |e| - log.error "job.insert API (rows)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: e.message, reason: e.reason + log.error "job.load API (rows)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: e.message, reason: e.reason end end - error_result = _response.status.error_result + error_result = response.status.error_result if error_result - log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason + log.error "job.load API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason if Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason) - @num_errors_per_chunk[chunk_id] = @num_errors_per_chunk[chunk_id].to_i + 1 + @num_errors_per_chunk[chunk_id_hex] = @num_errors_per_chunk[chunk_id_hex].to_i + 1 raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry") else - @num_errors_per_chunk.delete(chunk_id) + @num_errors_per_chunk.delete(chunk_id_hex) raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry") end end - log.debug "finish load job", state: _response.status.state + stats = response.statistics.load + duration = (response.statistics.end_time - response.statistics.creation_time) / 1000.0 + log.debug "load job finished", id: job_id, state: response.status.state, input_file_bytes: stats.input_file_bytes, input_files: stats.input_files, output_bytes: stats.output_bytes, output_rows: stats.output_rows, bad_records: stats.bad_records, duration: duration.round(2), project_id: project, dataset: dataset, table: table_id + @num_errors_per_chunk.delete(chunk_id_hex) end private def log @@ -289,11 +295,11 @@ def safe_table_id(table_id) table_id.gsub(/\$\d+$/, "") end - def create_job_id(chunk_id, dataset, table, schema) - job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id]}" + def create_job_id(chunk_id_hex, dataset, table, schema) + job_id_key = "#{chunk_id_hex}#{dataset}#{table}#{schema.to_s}#{@options[:max_bad_records]}#{@options[:ignore_unknown_values]}#{@num_errors_per_chunk[chunk_id_hex]}" @log.debug "job_id_key: #{job_id_key}" "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key) end def source_format