lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.1
- old
+ new
@@ -42,10 +42,11 @@
'table_name_old' => config.param('table_name_old', :string, :default => nil), # lower version compatibility
'auto_create_dataset' => config.param('auto_create_dataset', :bool, :default => false),
'auto_create_table' => config.param('auto_create_table', :bool, :default => false),
'schema_file' => config.param('schema_file', :string, :default => nil),
'template_table' => config.param('template_table', :string, :default => nil),
+
'delete_from_local_when_job_end' => config.param('delete_from_local_when_job_end', :bool, :default => true),
'job_status_max_polling_time' => config.param('job_status_max_polling_time', :integer, :default => 3600),
'job_status_polling_interval' => config.param('job_status_polling_interval', :integer, :default => 10),
'is_skip_job_result_check' => config.param('is_skip_job_result_check', :bool, :default => false),
'prevent_duplicate_insert' => config.param('prevent_duplicate_insert', :bool, :default => false),
@@ -60,10 +61,11 @@
'timeout_sec' => config.param('timeout_sec', :integer, :default => 300),
'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => 300),
'retries' => config.param('retries', :integer, :default => 5),
'application_name' => config.param('application_name', :string, :default => 'Embulk BigQuery plugin'),
+ 'sdk_log_level' => config.param('sdk_log_level', :string, :default => nil),
'path_prefix' => config.param('path_prefix', :string, :default => nil),
'sequence_format' => config.param('sequence_format', :string, :default => '.%d.%d'),
'file_ext' => config.param('file_ext', :string, :default => nil),
'skip_file_generation' => config.param('skip_file_generation', :bool, :default => false),
@@ -199,10 +201,14 @@
if task['with_rehearsal']
task['rehearsal_table'] ||= "LOAD_REHEARSAL_#{unique_name}_#{task['table']}"
end
+ if task['sdk_log_level']
+ Google::Apis.logger.level = eval("::Logger::#{task['sdk_log_level'].upcase}")
+ end
+
task
end
def self.bigquery
@bigquery
@@ -218,20 +224,20 @@
def self.rehearsal_thread=(rehearsal_thread)
@rehearsal_thread = rehearsal_thread
end
- def self.transaction_report(task_reports, responses)
- num_input_rows = task_reports.inject(0) do |sum, task_report|
- sum + task_report['num_input_rows']
- end
- num_output_rows = responses.inject(0) do |sum, response|
+ def self.transaction_report(file_writers, responses, target_table)
+ num_input_rows = file_writers.empty? ? 0 : file_writers.map(&:num_rows).inject(:+)
+ num_response_rows = responses.inject(0) do |sum, response|
sum + (response ? response.statistics.load.output_rows.to_i : 0)
end
+ num_output_rows = bigquery.get_table(target_table).num_rows.to_i
num_rejected_rows = num_input_rows - num_output_rows
transaction_report = {
'num_input_rows' => num_input_rows,
+ 'num_response_rows' => num_response_rows,
'num_output_rows' => num_output_rows,
'num_rejected_rows' => num_rejected_rows,
}
end
@@ -276,16 +282,16 @@
if task['skip_file_generation']
yield(task) # does nothing, but seems it has to be called
path_pattern = "#{task['path_prefix']}*#{task['file_ext']}"
Embulk.logger.info { "embulk-output-bigquery: Skip file generation. Get paths from `#{path_pattern}`" }
paths = Dir.glob(path_pattern)
- task_reports = paths.map {|path| { 'num_input_rows' => 0 } }
else
task_reports = yield(task) # generates local files
- Embulk.logger.info { "embulk-output-bigquery: task_reports: #{task_reports.to_json}" }
- paths = FileWriter.paths
- FileWriter.ios.values.each do |io|
+
+ ios = file_writers.map(&:io)
+ paths = ios.map(&:path)
+ ios.each do |io|
Embulk.logger.debug { "close #{io.path}" }
io.close rescue nil
end
end
@@ -296,11 +302,11 @@
if task['skip_load'] # only for debug
Embulk.logger.info { "embulk-output-bigquery: Skip load" }
else
target_table = task['temp_table'] ? task['temp_table'] : task['table']
responses = bigquery.load_in_parallel(paths, target_table)
- transaction_report = self.transaction_report(task_reports, responses)
+ transaction_report = self.transaction_report(file_writers, responses, target_table)
Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" }
if task['mode'] == 'replace_backup'
bigquery.copy(task['table'], task['table_old'], task['dataset_old'])
end
@@ -337,53 +343,79 @@
# this is for -c next_config option, add some paramters for next execution if wants
next_config_diff = {}
return next_config_diff
end
- # instance is created on each thread
+ @file_writers_mutex = Mutex.new
+ @file_writers = Array.new
+
+ def self.reset_file_writers
+ @file_writers = Array.new
+ end
+
+ def self.file_writers
+ @file_writers
+ end
+
+ def self.add_file_writer(file_writer)
+ @file_writers_mutex.synchronize do
+ @file_writers << file_writer
+ end
+ end
+
+ FILE_WRITER_KEY = :embulk_output_bigquery_file_writer
+
+ # Create one FileWriter object for one output thread, that is, share among tasks.
+ # Close theses shared objects in transaction.
+ # This is mainly to suppress (or control by -X max_threads) number of files, which
+ # equals to number of concurrency to load in parallel, when number of input tasks is many
+ #
+ # #file_writer must be called at only #add because threads in other methods
+ # are different (called from non-output threads). Note also that #add method
+ # of the same task instance would be called in different output threads
+ def file_writer
+ return Thread.current[FILE_WRITER_KEY] if Thread.current[FILE_WRITER_KEY]
+ file_writer = FileWriter.new(@task, @schema, @index, self.class.converters)
+ self.class.add_file_writer(file_writer)
+ Thread.current[FILE_WRITER_KEY] = file_writer
+ end
+
+ # instance is created on each task
def initialize(task, schema, index)
super
if task['with_rehearsal'] and @index == 0
@rehearsaled = false
- @num_rows = 0
end
-
- unless task['skip_file_generation']
- @file_writer = FileWriter.new(task, schema, index, self.class.converters)
- end
end
- # called for each page in each thread
+ # called for each page in each task
def close
end
- # called for each page in each thread
+ # called for each page in each task
def add(page)
+ return if task['skip_file_generation']
+ num_rows = file_writer.add(page)
+
if task['with_rehearsal'] and @index == 0 and !@rehearsaled
- page = page.to_a # to avoid https://github.com/embulk/embulk/issues/403
- if @num_rows >= task['rehearsal_counts']
+ if num_rows >= task['rehearsal_counts']
load_rehearsal
@rehearsaled = true
end
- @num_rows += page.to_a.size
end
-
- unless task['skip_file_generation']
- @file_writer.add(page)
- end
end
def load_rehearsal
bigquery = self.class.bigquery
Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" }
- io = @file_writer.close # need to close once for gzip
+ io = file_writer.close # need to close once for gzip
rehearsal_path = "#{io.path}.rehearsal"
Embulk.logger.debug { "embulk_output_bigquery: cp #{io.path} #{rehearsal_path}" }
FileUtils.cp(io.path, rehearsal_path)
- @file_writer.reopen
+ file_writer.reopen
self.class.rehearsal_thread = Thread.new do
begin
bigquery.create_table(task['rehearsal_table'])
response = bigquery.load(rehearsal_path, task['rehearsal_table'])
@@ -401,16 +433,12 @@
end
def abort
end
- # called after processing all pages in each thread, returns a task_report
+ # called after processing all pages in each task, returns a task_report
def commit
- unless task['skip_file_generation']
- @file_writer.commit
- else
- {}
- end
+ {}
end
end
end
end