lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.1 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.2
- old
+ new
@@ -1,8 +1,9 @@
require 'json'
require 'tempfile'
require 'fileutils'
+require 'securerandom'
require_relative 'bigquery/bigquery_client'
require_relative 'bigquery/file_writer'
require_relative 'bigquery/value_converter_factory'
module Embulk
@@ -50,10 +51,11 @@
'job_status_polling_interval' => config.param('job_status_polling_interval', :integer, :default => 10),
'is_skip_job_result_check' => config.param('is_skip_job_result_check', :bool, :default => false),
'prevent_duplicate_insert' => config.param('prevent_duplicate_insert', :bool, :default => false),
'with_rehearsal' => config.param('with_rehearsal', :bool, :default => false),
'rehearsal_counts' => config.param('rehearsal_counts', :integer, :default => 1000),
+ 'abort_on_error' => config.param('abort_on_error', :bool, :default => nil),
'column_options' => config.param('column_options', :array, :default => []),
'default_timezone' => config.param('default_timezone', :string, :default => ValueConverterFactory::DEFAULT_TIMEZONE),
'default_timestamp_format' => config.param('default_timestamp_format', :string, :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT),
'payload_column' => config.param('payload_column', :string, :default => nil),
@@ -191,11 +193,11 @@
file_ext << '.gz'
end
task['file_ext'] = file_ext
end
- unique_name = "%08x%08x%08x" % [Process.pid, now.tv_sec, now.tv_nsec]
+ unique_name = SecureRandom.uuid.gsub('-', '_')
if %w[replace replace_backup append].include?(task['mode'])
task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}"
end
@@ -205,10 +207,14 @@
if task['sdk_log_level']
Google::Apis.logger.level = eval("::Logger::#{task['sdk_log_level'].upcase}")
end
+ if task['abort_on_error'].nil?
+ task['abort_on_error'] = (task['max_bad_records'] == 0)
+ end
+
task
end
def self.bigquery
@bigquery
@@ -224,16 +230,20 @@
def self.rehearsal_thread=(rehearsal_thread)
@rehearsal_thread = rehearsal_thread
end
- def self.transaction_report(file_writers, responses, target_table)
+ def self.transaction_report(task, responses)
num_input_rows = file_writers.empty? ? 0 : file_writers.map(&:num_rows).inject(:+)
num_response_rows = responses.inject(0) do |sum, response|
sum + (response ? response.statistics.load.output_rows.to_i : 0)
end
- num_output_rows = bigquery.get_table(target_table).num_rows.to_i
+ if task['temp_table']
+ num_output_rows = bigquery.get_table(task['temp_table']).num_rows.to_i
+ else
+ num_output_rows = num_response_rows
+ end
num_rejected_rows = num_input_rows - num_output_rows
transaction_report = {
'num_input_rows' => num_input_rows,
'num_response_rows' => num_response_rows,
'num_output_rows' => num_output_rows,
@@ -302,11 +312,18 @@
if task['skip_load'] # only for debug
Embulk.logger.info { "embulk-output-bigquery: Skip load" }
else
target_table = task['temp_table'] ? task['temp_table'] : task['table']
responses = bigquery.load_in_parallel(paths, target_table)
- transaction_report = self.transaction_report(file_writers, responses, target_table)
+ transaction_report = self.transaction_report(task, responses)
Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" }
+
+ if task['abort_on_error']
+ if transaction_report['num_input_rows'] != transaction_report['num_output_rows']
+ raise Error, "ABORT: `num_input_rows (#{transaction_report['num_input_rows']})` and " \
+ "`num_output_rows (#{transaction_report['num_output_rows']})` does not match"
+ end
+ end
if task['mode'] == 'replace_backup'
bigquery.copy(task['table'], task['table_old'], task['dataset_old'])
end