lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.1 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.2

- old
+ new

@@ -1,8 +1,9 @@ require 'json' require 'tempfile' require 'fileutils' +require 'securerandom' require_relative 'bigquery/bigquery_client' require_relative 'bigquery/file_writer' require_relative 'bigquery/value_converter_factory' module Embulk @@ -50,10 +51,11 @@ 'job_status_polling_interval' => config.param('job_status_polling_interval', :integer, :default => 10), 'is_skip_job_result_check' => config.param('is_skip_job_result_check', :bool, :default => false), 'prevent_duplicate_insert' => config.param('prevent_duplicate_insert', :bool, :default => false), 'with_rehearsal' => config.param('with_rehearsal', :bool, :default => false), 'rehearsal_counts' => config.param('rehearsal_counts', :integer, :default => 1000), + 'abort_on_error' => config.param('abort_on_error', :bool, :default => nil), 'column_options' => config.param('column_options', :array, :default => []), 'default_timezone' => config.param('default_timezone', :string, :default => ValueConverterFactory::DEFAULT_TIMEZONE), 'default_timestamp_format' => config.param('default_timestamp_format', :string, :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT), 'payload_column' => config.param('payload_column', :string, :default => nil), @@ -191,11 +193,11 @@ file_ext << '.gz' end task['file_ext'] = file_ext end - unique_name = "%08x%08x%08x" % [Process.pid, now.tv_sec, now.tv_nsec] + unique_name = SecureRandom.uuid.gsub('-', '_') if %w[replace replace_backup append].include?(task['mode']) task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}" end @@ -205,10 +207,14 @@ if task['sdk_log_level'] Google::Apis.logger.level = eval("::Logger::#{task['sdk_log_level'].upcase}") end + if task['abort_on_error'].nil? + task['abort_on_error'] = (task['max_bad_records'] == 0) + end + task end def self.bigquery @bigquery @@ -224,16 +230,20 @@ def self.rehearsal_thread=(rehearsal_thread) @rehearsal_thread = rehearsal_thread end - def self.transaction_report(file_writers, responses, target_table) + def self.transaction_report(task, responses) num_input_rows = file_writers.empty? ? 0 : file_writers.map(&:num_rows).inject(:+) num_response_rows = responses.inject(0) do |sum, response| sum + (response ? response.statistics.load.output_rows.to_i : 0) end - num_output_rows = bigquery.get_table(target_table).num_rows.to_i + if task['temp_table'] + num_output_rows = bigquery.get_table(task['temp_table']).num_rows.to_i + else + num_output_rows = num_response_rows + end num_rejected_rows = num_input_rows - num_output_rows transaction_report = { 'num_input_rows' => num_input_rows, 'num_response_rows' => num_response_rows, 'num_output_rows' => num_output_rows, @@ -302,11 +312,18 @@ if task['skip_load'] # only for debug Embulk.logger.info { "embulk-output-bigquery: Skip load" } else target_table = task['temp_table'] ? task['temp_table'] : task['table'] responses = bigquery.load_in_parallel(paths, target_table) - transaction_report = self.transaction_report(file_writers, responses, target_table) + transaction_report = self.transaction_report(task, responses) Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" } + + if task['abort_on_error'] + if transaction_report['num_input_rows'] != transaction_report['num_output_rows'] + raise Error, "ABORT: `num_input_rows (#{transaction_report['num_input_rows']})` and " \ + "`num_output_rows (#{transaction_report['num_output_rows']})` does not match" + end + end if task['mode'] == 'replace_backup' bigquery.copy(task['table'], task['table_old'], task['dataset_old']) end