lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.1

- old
+ new

@@ -42,10 +42,11 @@ 'table_name_old' => config.param('table_name_old', :string, :default => nil), # lower version compatibility 'auto_create_dataset' => config.param('auto_create_dataset', :bool, :default => false), 'auto_create_table' => config.param('auto_create_table', :bool, :default => false), 'schema_file' => config.param('schema_file', :string, :default => nil), 'template_table' => config.param('template_table', :string, :default => nil), + 'delete_from_local_when_job_end' => config.param('delete_from_local_when_job_end', :bool, :default => true), 'job_status_max_polling_time' => config.param('job_status_max_polling_time', :integer, :default => 3600), 'job_status_polling_interval' => config.param('job_status_polling_interval', :integer, :default => 10), 'is_skip_job_result_check' => config.param('is_skip_job_result_check', :bool, :default => false), 'prevent_duplicate_insert' => config.param('prevent_duplicate_insert', :bool, :default => false), @@ -60,10 +61,11 @@ 'timeout_sec' => config.param('timeout_sec', :integer, :default => 300), 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => 300), 'retries' => config.param('retries', :integer, :default => 5), 'application_name' => config.param('application_name', :string, :default => 'Embulk BigQuery plugin'), + 'sdk_log_level' => config.param('sdk_log_level', :string, :default => nil), 'path_prefix' => config.param('path_prefix', :string, :default => nil), 'sequence_format' => config.param('sequence_format', :string, :default => '.%d.%d'), 'file_ext' => config.param('file_ext', :string, :default => nil), 'skip_file_generation' => config.param('skip_file_generation', :bool, :default => false), @@ -199,10 +201,14 @@ if task['with_rehearsal'] task['rehearsal_table'] ||= "LOAD_REHEARSAL_#{unique_name}_#{task['table']}" end + if task['sdk_log_level'] + Google::Apis.logger.level = eval("::Logger::#{task['sdk_log_level'].upcase}") + end + task end def self.bigquery @bigquery @@ -218,20 +224,20 @@ def self.rehearsal_thread=(rehearsal_thread) @rehearsal_thread = rehearsal_thread end - def self.transaction_report(task_reports, responses) - num_input_rows = task_reports.inject(0) do |sum, task_report| - sum + task_report['num_input_rows'] - end - num_output_rows = responses.inject(0) do |sum, response| + def self.transaction_report(file_writers, responses, target_table) + num_input_rows = file_writers.empty? ? 0 : file_writers.map(&:num_rows).inject(:+) + num_response_rows = responses.inject(0) do |sum, response| sum + (response ? response.statistics.load.output_rows.to_i : 0) end + num_output_rows = bigquery.get_table(target_table).num_rows.to_i num_rejected_rows = num_input_rows - num_output_rows transaction_report = { 'num_input_rows' => num_input_rows, + 'num_response_rows' => num_response_rows, 'num_output_rows' => num_output_rows, 'num_rejected_rows' => num_rejected_rows, } end @@ -276,16 +282,16 @@ if task['skip_file_generation'] yield(task) # does nothing, but seems it has to be called path_pattern = "#{task['path_prefix']}*#{task['file_ext']}" Embulk.logger.info { "embulk-output-bigquery: Skip file generation. Get paths from `#{path_pattern}`" } paths = Dir.glob(path_pattern) - task_reports = paths.map {|path| { 'num_input_rows' => 0 } } else task_reports = yield(task) # generates local files - Embulk.logger.info { "embulk-output-bigquery: task_reports: #{task_reports.to_json}" } - paths = FileWriter.paths - FileWriter.ios.values.each do |io| + + ios = file_writers.map(&:io) + paths = ios.map(&:path) + ios.each do |io| Embulk.logger.debug { "close #{io.path}" } io.close rescue nil end end @@ -296,11 +302,11 @@ if task['skip_load'] # only for debug Embulk.logger.info { "embulk-output-bigquery: Skip load" } else target_table = task['temp_table'] ? task['temp_table'] : task['table'] responses = bigquery.load_in_parallel(paths, target_table) - transaction_report = self.transaction_report(task_reports, responses) + transaction_report = self.transaction_report(file_writers, responses, target_table) Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" } if task['mode'] == 'replace_backup' bigquery.copy(task['table'], task['table_old'], task['dataset_old']) end @@ -337,53 +343,79 @@ # this is for -c next_config option, add some paramters for next execution if wants next_config_diff = {} return next_config_diff end - # instance is created on each thread + @file_writers_mutex = Mutex.new + @file_writers = Array.new + + def self.reset_file_writers + @file_writers = Array.new + end + + def self.file_writers + @file_writers + end + + def self.add_file_writer(file_writer) + @file_writers_mutex.synchronize do + @file_writers << file_writer + end + end + + FILE_WRITER_KEY = :embulk_output_bigquery_file_writer + + # Create one FileWriter object for one output thread, that is, share among tasks. + # Close theses shared objects in transaction. + # This is mainly to suppress (or control by -X max_threads) number of files, which + # equals to number of concurrency to load in parallel, when number of input tasks is many + # + # #file_writer must be called at only #add because threads in other methods + # are different (called from non-output threads). Note also that #add method + # of the same task instance would be called in different output threads + def file_writer + return Thread.current[FILE_WRITER_KEY] if Thread.current[FILE_WRITER_KEY] + file_writer = FileWriter.new(@task, @schema, @index, self.class.converters) + self.class.add_file_writer(file_writer) + Thread.current[FILE_WRITER_KEY] = file_writer + end + + # instance is created on each task def initialize(task, schema, index) super if task['with_rehearsal'] and @index == 0 @rehearsaled = false - @num_rows = 0 end - - unless task['skip_file_generation'] - @file_writer = FileWriter.new(task, schema, index, self.class.converters) - end end - # called for each page in each thread + # called for each page in each task def close end - # called for each page in each thread + # called for each page in each task def add(page) + return if task['skip_file_generation'] + num_rows = file_writer.add(page) + if task['with_rehearsal'] and @index == 0 and !@rehearsaled - page = page.to_a # to avoid https://github.com/embulk/embulk/issues/403 - if @num_rows >= task['rehearsal_counts'] + if num_rows >= task['rehearsal_counts'] load_rehearsal @rehearsaled = true end - @num_rows += page.to_a.size end - - unless task['skip_file_generation'] - @file_writer.add(page) - end end def load_rehearsal bigquery = self.class.bigquery Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" } - io = @file_writer.close # need to close once for gzip + io = file_writer.close # need to close once for gzip rehearsal_path = "#{io.path}.rehearsal" Embulk.logger.debug { "embulk_output_bigquery: cp #{io.path} #{rehearsal_path}" } FileUtils.cp(io.path, rehearsal_path) - @file_writer.reopen + file_writer.reopen self.class.rehearsal_thread = Thread.new do begin bigquery.create_table(task['rehearsal_table']) response = bigquery.load(rehearsal_path, task['rehearsal_table']) @@ -401,16 +433,12 @@ end def abort end - # called after processing all pages in each thread, returns a task_report + # called after processing all pages in each task, returns a task_report def commit - unless task['skip_file_generation'] - @file_writer.commit - else - {} - end + {} end end end end