lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0.pre5 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0.pre6
- old
+ new
@@ -1,7 +1,8 @@
require 'json'
require 'tempfile'
+require 'fileutils'
require_relative 'bigquery/bigquery_client'
require_relative 'bigquery/file_writer'
require_relative 'bigquery/value_converter_factory'
module Embulk
@@ -209,27 +210,31 @@
def self.converters
@converters
end
+ def self.rehearsal_thread
+ @rehearsal_thread
+ end
+
+ def self.rehearsal_thread=(rehearsal_thread)
+ @rehearsal_thread = rehearsal_thread
+ end
+
def self.transaction_report(task_reports, responses)
+ num_input_rows = task_reports.inject(0) do |sum, task_report|
+ sum + task_report['num_input_rows']
+ end
+ num_output_rows = responses.inject(0) do |sum, response|
+ sum + (response ? response.statistics.load.output_rows.to_i : 0)
+ end
+ num_rejected_rows = num_input_rows - num_output_rows
transaction_report = {
- 'num_input_rows' => 0,
- 'num_output_rows' => 0,
- 'num_rejected_rows' => 0,
+ 'num_input_rows' => num_input_rows,
+ 'num_output_rows' => num_output_rows,
+ 'num_rejected_rows' => num_rejected_rows,
}
- (0...task_reports.size).each do |idx|
- task_report = task_reports[idx]
- response = responses[idx]
- num_input_rows = task_report['num_input_rows']
- num_output_rows = response ? response.statistics.load.output_rows.to_i : 0
- num_rejected_rows = num_input_rows - num_output_rows
- transaction_report['num_input_rows'] += num_input_rows
- transaction_report['num_output_rows'] += num_output_rows
- transaction_report['num_rejected_rows'] += num_rejected_rows
- end
- transaction_report
end
def self.transaction(config, schema, task_count, &control)
task = self.configure(config, schema, task_count)
@@ -276,13 +281,20 @@
task_reports = paths.map {|path| { 'num_input_rows' => 0 } }
else
task_reports = yield(task) # generates local files
Embulk.logger.info { "embulk-output-bigquery: task_reports: #{task_reports.to_json}" }
paths = FileWriter.paths
- FileWriter.ios.each {|io| io.close rescue nil }
+ FileWriter.ios.values.each do |io|
+ Embulk.logger.debug { "close #{io.path}" }
+ io.close rescue nil
+ end
end
+ if rehearsal_thread
+ rehearsal_thread.join
+ end
+
if task['skip_load'] # only for debug
Embulk.logger.info { "embulk-output-bigquery: Skip load" }
else
target_table = task['temp_table'] ? task['temp_table'] : task['table']
responses = bigquery.load_in_parallel(paths, target_table)
@@ -330,11 +342,10 @@
# instance is created on each thread
def initialize(task, schema, index)
super
if task['with_rehearsal'] and @index == 0
- @bigquery = self.class.bigquery
@rehearsaled = false
@num_rows = 0
end
unless task['skip_file_generation']
@@ -349,23 +360,41 @@
# called for each page in each thread
def add(page)
if task['with_rehearsal'] and @index == 0 and !@rehearsaled
page = page.to_a # to avoid https://github.com/embulk/embulk/issues/403
if @num_rows >= task['rehearsal_counts']
- Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" }
- begin
- @bigquery.create_table(task['rehearsal_table'])
- @bigquery.load(FileWriter.paths.first, task['rehearsal_table'])
- ensure
- @bigquery.delete_table(task['rehearsal_table'])
- end
+ load_rehearsal
@rehearsaled = true
end
@num_rows += page.to_a.size
end
unless task['skip_file_generation']
@file_writer.add(page)
+ end
+ end
+
+ def load_rehearsal
+ bigquery = self.class.bigquery
+ Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" }
+
+ io = @file_writer.close # need to close once for gzip
+ rehearsal_path = "#{io.path}.rehearsal"
+ Embulk.logger.debug { "embulk_output_bigquery: cp #{io.path} #{rehearsal_path}" }
+ FileUtils.cp(io.path, rehearsal_path)
+ @file_writer.reopen
+
+ self.class.rehearsal_thread = Thread.new do
+ begin
+ bigquery.create_table(task['rehearsal_table'])
+ response = bigquery.load(rehearsal_path, task['rehearsal_table'])
+ num_output_rows = response ? response.statistics.load.output_rows.to_i : 0
+ Embulk.logger.info { "embulk-output-bigquery: Loaded rehearsal #{num_output_rows}" }
+ ensure
+ Embulk.logger.debug { "embulk_output_bigquery: delete #{rehearsal_path}" }
+ File.unlink(rehearsal_path) rescue nil
+ bigquery.delete_table(task['rehearsal_table'])
+ end
end
end
def finish
end