lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0.pre5 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0.pre6

- old
+ new

@@ -1,7 +1,8 @@ require 'json' require 'tempfile' +require 'fileutils' require_relative 'bigquery/bigquery_client' require_relative 'bigquery/file_writer' require_relative 'bigquery/value_converter_factory' module Embulk @@ -209,27 +210,31 @@ def self.converters @converters end + def self.rehearsal_thread + @rehearsal_thread + end + + def self.rehearsal_thread=(rehearsal_thread) + @rehearsal_thread = rehearsal_thread + end + def self.transaction_report(task_reports, responses) + num_input_rows = task_reports.inject(0) do |sum, task_report| + sum + task_report['num_input_rows'] + end + num_output_rows = responses.inject(0) do |sum, response| + sum + (response ? response.statistics.load.output_rows.to_i : 0) + end + num_rejected_rows = num_input_rows - num_output_rows transaction_report = { - 'num_input_rows' => 0, - 'num_output_rows' => 0, - 'num_rejected_rows' => 0, + 'num_input_rows' => num_input_rows, + 'num_output_rows' => num_output_rows, + 'num_rejected_rows' => num_rejected_rows, } - (0...task_reports.size).each do |idx| - task_report = task_reports[idx] - response = responses[idx] - num_input_rows = task_report['num_input_rows'] - num_output_rows = response ? response.statistics.load.output_rows.to_i : 0 - num_rejected_rows = num_input_rows - num_output_rows - transaction_report['num_input_rows'] += num_input_rows - transaction_report['num_output_rows'] += num_output_rows - transaction_report['num_rejected_rows'] += num_rejected_rows - end - transaction_report end def self.transaction(config, schema, task_count, &control) task = self.configure(config, schema, task_count) @@ -276,13 +281,20 @@ task_reports = paths.map {|path| { 'num_input_rows' => 0 } } else task_reports = yield(task) # generates local files Embulk.logger.info { "embulk-output-bigquery: task_reports: #{task_reports.to_json}" } paths = FileWriter.paths - FileWriter.ios.each {|io| io.close rescue nil } + FileWriter.ios.values.each do |io| + Embulk.logger.debug { "close #{io.path}" } + io.close rescue nil + end end + if rehearsal_thread + rehearsal_thread.join + end + if task['skip_load'] # only for debug Embulk.logger.info { "embulk-output-bigquery: Skip load" } else target_table = task['temp_table'] ? task['temp_table'] : task['table'] responses = bigquery.load_in_parallel(paths, target_table) @@ -330,11 +342,10 @@ # instance is created on each thread def initialize(task, schema, index) super if task['with_rehearsal'] and @index == 0 - @bigquery = self.class.bigquery @rehearsaled = false @num_rows = 0 end unless task['skip_file_generation'] @@ -349,23 +360,41 @@ # called for each page in each thread def add(page) if task['with_rehearsal'] and @index == 0 and !@rehearsaled page = page.to_a # to avoid https://github.com/embulk/embulk/issues/403 if @num_rows >= task['rehearsal_counts'] - Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" } - begin - @bigquery.create_table(task['rehearsal_table']) - @bigquery.load(FileWriter.paths.first, task['rehearsal_table']) - ensure - @bigquery.delete_table(task['rehearsal_table']) - end + load_rehearsal @rehearsaled = true end @num_rows += page.to_a.size end unless task['skip_file_generation'] @file_writer.add(page) + end + end + + def load_rehearsal + bigquery = self.class.bigquery + Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" } + + io = @file_writer.close # need to close once for gzip + rehearsal_path = "#{io.path}.rehearsal" + Embulk.logger.debug { "embulk_output_bigquery: cp #{io.path} #{rehearsal_path}" } + FileUtils.cp(io.path, rehearsal_path) + @file_writer.reopen + + self.class.rehearsal_thread = Thread.new do + begin + bigquery.create_table(task['rehearsal_table']) + response = bigquery.load(rehearsal_path, task['rehearsal_table']) + num_output_rows = response ? response.statistics.load.output_rows.to_i : 0 + Embulk.logger.info { "embulk-output-bigquery: Loaded rehearsal #{num_output_rows}" } + ensure + Embulk.logger.debug { "embulk_output_bigquery: delete #{rehearsal_path}" } + File.unlink(rehearsal_path) rescue nil + bigquery.delete_table(task['rehearsal_table']) + end end end def finish end