lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.2.3 vs lib/embulk/output/bigquery.rb in embulk-output-bigquery-0.3.0.pre1
- old
+ new
@@ -1,3 +1,388 @@
-Embulk::JavaPlugin.register_output(
- "bigquery", "org.embulk.output.BigqueryOutputPlugin",
- File.expand_path('../../../../classpath', __FILE__))
+require 'json'
+require 'tempfile'
+require_relative 'bigquery/bigquery_client'
+require_relative 'bigquery/file_writer'
+require_relative 'bigquery/value_converter_factory'
+
+module Embulk
+ module Output
+ class Bigquery < OutputPlugin
+ Plugin.register_output('bigquery', self)
+
+ class Error < StandardError; end
+
+ # To support configuration like below as org.embulk.spi.unit.LoalFile
+ #
+ # json_keyfile:
+ # content: |
+ class LocalFile
+ # @return JSON string
+ def self.load(v)
+ if v.is_a?(String) # path
+ File.read(v)
+ elsif v.is_a?(Hash)
+ v['content']
+ end
+ end
+ end
+
+ def self.configure(config, schema, processor_count)
+ task = {
+ 'mode' => config.param('mode', :string, :default => 'append'),
+ 'auth_method' => config.param('auth_method', :string, :default => 'private_key'),
+ 'service_account_email' => config.param('service_account_email', :string, :default => nil),
+ 'p12_keyfile' => config.param('p12_keyfile', :string, :default => nil),
+ 'json_keyfile' => config.param('json_keyfile', LocalFile, :default => nil),
+ 'project' => config.param('project', :string, :default => nil),
+ 'dataset' => config.param('dataset', :string),
+ 'table' => config.param('table', :string),
+ 'dataset_old' => config.param('dataset_old', :string, :default => nil),
+ 'table_old' => config.param('table_old', :string, :default => nil),
+ 'table_name_old' => config.param('table_name_old', :string, :default => nil), # lower version compatibility
+ 'auto_create_dataset' => config.param('auto_create_dataset', :bool, :default => false),
+ 'auto_create_table' => config.param('auto_create_table', :bool, :default => false),
+ 'schema_file' => config.param('schema_file', :string, :default => nil),
+ 'template_table' => config.param('template_table', :string, :default => nil),
+ 'delete_from_local_when_job_end' => config.param('delete_from_local_when_job_end', :bool, :default => true),
+ 'job_status_max_polling_time' => config.param('job_status_max_polling_time', :integer, :default => 3600),
+ 'job_status_polling_interval' => config.param('job_status_polling_interval', :integer, :default => 10),
+ 'is_skip_job_result_check' => config.param('is_skip_job_result_check', :bool, :default => false),
+ 'prevent_duplicate_insert' => config.param('prevent_duplicate_insert', :bool, :default => false),
+ 'with_rehearsal' => config.param('with_rehearsal', :bool, :default => false),
+ 'rehearsal_counts' => config.param('rehearsal_counts', :integer, :default => 1000),
+
+ 'column_options' => config.param('column_options', :array, :default => []),
+ 'default_timezone' => config.param('default_timezone', :string, :default => ValueConverterFactory::DEFAULT_TIMEZONE),
+ 'default_timestamp_format' => config.param('default_timestamp_format', :string, :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT),
+ 'payload_column' => config.param('payload_column', :string, :default => nil),
+ 'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),
+
+ 'timeout_sec' => config.param('timeout_sec', :integer, :default => 300),
+ 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => 300),
+ 'retries' => config.param('retries', :integer, :default => 5),
+ 'application_name' => config.param('application_name', :string, :default => 'Embulk BigQuery plugin'),
+
+ 'path_prefix' => config.param('path_prefix', :string, :default => nil),
+ 'sequence_format' => config.param('sequence_format', :string, :default => '.%d.%03d'),
+ 'file_ext' => config.param('file_ext', :string, :default => nil),
+ 'skip_file_generation' => config.param('skip_file_generation', :bool, :default => false),
+ 'compression' => config.param('compression', :string, :default => 'NONE'),
+
+ 'source_format' => config.param('source_format', :string, :default => 'CSV'),
+ 'max_bad_records' => config.param('max_bad_records', :integer, :default => 0),
+ 'field_delimiter' => config.param('field_delimiter', :string, :default => ','),
+ 'encoding' => config.param('encoding', :string, :default => 'UTF-8'),
+ 'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
+ 'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
+
+ # for debug
+ 'skip_load' => config.param('skip_load', :bool, :default => false),
+ 'temp_table' => config.param('temp_table', :string, :default => nil),
+ 'rehearsal_table' => config.param('rehearsal_table', :string, :default => nil),
+ }
+
+ now = Time.now
+
+ task['mode'] = task['mode'].downcase
+ unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode'])
+ raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup"
+ end
+
+ if task['mode'] == 'replace_backup'
+ task['table_old'] ||= task['table_name_old'] # for lower version compatibility
+ if task['dataset_old'].nil? and task['table_old'].nil?
+ raise ConfigError.new "`mode replace_backup` requires either of `dataset_old` or `table_old`"
+ end
+ task['dataset_old'] ||= task['dataset']
+ task['table_old'] ||= task['table']
+ end
+
+ if task['table_old']
+ task['table_old'] = now.strftime(task['table_old'])
+ end
+ if task['table']
+ task['table'] = now.strftime(task['table'])
+ end
+
+ task['auth_method'] = task['auth_method'].downcase
+ unless %w[private_key json_key compute_engine].include?(task['auth_method'])
+ raise ConfigError.new "`auth_method` must be one of private_key, json_key, compute_engine"
+ end
+ if task['auth_method'] == 'private_key' and task['p12_keyfile'].nil?
+ raise ConfigError.new "`p12_keyfile` is required for auth_method private_key"
+ end
+ if task['auth_method'] == 'json_key' and task['json_keyfile'].nil?
+ raise ConfigError.new "`json_keyfile` is required for auth_method json_key"
+ end
+
+ jsonkey_params = nil
+ if task['json_keyfile']
+ begin
+ jsonkey_params = JSON.parse(task['json_keyfile'])
+ rescue => e
+ raise ConfigError.new "json_keyfile is not a JSON file"
+ end
+ end
+
+ if jsonkey_params
+ task['project'] ||= jsonkey_params['project_id']
+ end
+ if task['project'].nil?
+ raise ConfigError.new "Required field \"project\" is not set"
+ end
+
+ if (task['payload_column'] or task['payload_column_index']) and task['auto_create_table']
+ if task['schema_file'].nil? and task['template_table'].nil?
+ raise ConfigError.new "Cannot guess table schema from Embulk schema with `payload_column` or `payload_column_index`. Either of `schema_file` or `template_table` is required for auto_create_table true"
+ end
+ end
+
+ if task['payload_column_index']
+ if task['payload_column_index'] < 0 || schema.size <= task['payload_column_index']
+ raise ConfigError.new "payload_column_index #{task['payload_column_index']} is out of schema size"
+ end
+ elsif task['payload_column']
+ task['payload_column_index'] = schema.find_index {|c| c[:name] == task['payload_column'] }
+ if task['payload_column_index'].nil?
+ raise ConfigError.new "payload_column #{task['payload_column']} does not exist in schema"
+ end
+ end
+
+ if task['schema_file']
+ unless File.exist?(task['schema_file'])
+ raise ConfigError.new "schema_file #{task['schema_file']} is not found"
+ end
+ begin
+ JSON.parse(File.read(task['schema_file']))
+ rescue => e
+ raise ConfigError.new "schema_file #{task['schema_file']} is not a JSON file"
+ end
+ end
+
+ if task['path_prefix'].nil?
+ task['path_prefix'] = Tempfile.create('embulk_output_bigquery_') {|fp| fp.path }
+ end
+
+ task['source_format'] = task['source_format'].upcase
+ if task['source_format'] == 'JSONL'
+ task['source_format'] = 'NEWLINE_DELIMITED_JSON'
+ end
+ unless %w[CSV NEWLINE_DELIMITED_JSON].include?(task['source_format'])
+ raise ConfigError.new "`source_format` must be CSV or NEWLINE_DELIMITED_JSON (JSONL)"
+ end
+
+ task['compression'] = task['compression'].upcase
+ unless %w[GZIP NONE].include?(task['compression'])
+ raise ConfigError.new "`compression` must be GZIP or NONE"
+ end
+
+ if task['file_ext'].nil?
+ case task['source_format']
+ when 'CSV'
+ file_ext = '.csv'
+ else # newline_delimited_json
+ file_ext = '.jsonl'
+ end
+ case task['compression']
+ when 'GZIP'
+ file_ext << '.gz'
+ end
+ task['file_ext'] = file_ext
+ end
+
+ unique_name = "%08x%08x%08x" % [Process.pid, now.tv_sec, now.tv_nsec]
+
+ if %w[replace replace_backup append].include?(task['mode'])
+ task['temp_table'] ||= "#{task['table']}_LOAD_TEMP_#{unique_name}"
+ end
+
+ if task['with_rehearsal']
+ task['rehearsal_table'] ||= "#{task['table']}_LOAD_REHEARSAL_#{unique_name}"
+ end
+
+ task
+ end
+
+ def self.bigquery
+ @bigquery
+ end
+
+ def self.converters
+ @converters
+ end
+
+ def self.transaction_report(task_reports, responses)
+ transaction_report = {
+ 'num_input_rows' => 0,
+ 'num_output_rows' => 0,
+ 'num_rejected_rows' => 0,
+ }
+ (0...task_reports.size).each do |idx|
+ task_report = task_reports[idx]
+ response = responses[idx]
+ num_input_rows = task_report['num_input_rows']
+ num_output_rows = response ? response.statistics.load.output_rows.to_i : 0
+ num_rejected_rows = num_input_rows - num_output_rows
+ transaction_report['num_input_rows'] += num_input_rows
+ transaction_report['num_output_rows'] += num_output_rows
+ transaction_report['num_rejected_rows'] += num_rejected_rows
+ end
+ transaction_report
+ end
+
+ def self.transaction(config, schema, processor_count, &control)
+ task = self.configure(config, schema, processor_count)
+
+ @task = task
+ @schema = schema
+ @bigquery = BigqueryClient.new(task, schema)
+ @converters = ValueConverterFactory.create_converters(task, schema)
+
+ if task['auto_create_dataset']
+ bigquery.create_dataset(task['dataset'])
+ else
+ bigquery.get_dataset(task['dataset']) # raises NotFoundError
+ end
+
+ if task['mode'] == 'replace_backup' and task['dataset_old'] != task['dataset']
+ if task['auto_create_dataset']
+ bigquery.create_dataset(task['dataset_old'], reference: task['dataset'])
+ else
+ bigquery.get_dataset(task['dataset_old']) # raises NotFoundError
+ end
+ end
+
+ case task['mode']
+ when 'delete_in_advance'
+ bigquery.delete_table(task['table'])
+ bigquery.create_table(task['table'])
+ when 'replace', 'replace_backup', 'append'
+ bigquery.create_table(task['temp_table'])
+ else # append_direct
+ if task['auto_create_table']
+ bigquery.create_table(task['table'])
+ else
+ bigquery.get_table(task['table']) # raises NotFoundError
+ end
+ end
+
+ begin
+ paths = []
+ if task['skip_file_generation']
+ yield(task) # does nothing, but seems it has to be called
+ path_pattern = "#{task['path_prefix']}*#{task['file_ext']}"
+ Embulk.logger.info { "embulk-output-bigquery: Skip file generation. Get paths from `#{path_pattern}`" }
+ paths = Dir.glob(path_pattern)
+ task_reports = paths.map {|path| { 'path' => path, 'num_input_rows' => 0 } }
+ else
+ task_reports = yield(task) # generates local files
+ Embulk.logger.info { "embulk-output-bigquery: task_reports: #{task_reports.to_json}" }
+ paths = task_reports.map {|report| report['path'] }
+ end
+
+ if task['skip_load'] # only for debug
+ Embulk.logger.info { "embulk-output-bigquery: Skip load" }
+ else
+ target_table = task['temp_table'] ? task['temp_table'] : task['table']
+ responses = bigquery.load_in_parallel(paths, target_table)
+ transaction_report = self.transaction_report(task_reports, responses)
+ Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" }
+
+ if task['mode'] == 'replace_backup'
+ bigquery.copy(task['table'], task['table_old'], task['dataset_old'])
+ end
+
+ if task['temp_table']
+ if task['mode'] == 'append'
+ bigquery.copy(task['temp_table'], task['table'],
+ write_disposition: 'WRITE_APPEND')
+ else # replace or replace_backup
+ bigquery.copy(task['temp_table'], task['table'],
+ write_disposition: 'WRITE_TRUNCATE')
+ end
+ end
+ end
+ ensure
+ begin
+ if task['temp_table'] # replace or replace_backup
+ bigquery.delete_table(task['temp_table'])
+ end
+ ensure
+ if task['delete_from_local_when_job_end']
+ paths.each do |path|
+ Embulk.logger.info { "delete #{path}" }
+ File.unlink(path) rescue nil
+ end
+ else
+ paths.each do |path|
+ if File.exist?(path)
+ Embulk.logger.info { "#{path} is left" }
+ end
+ end
+ end
+ end
+ end
+
+ # this is for -c next_config option, add some paramters for next execution if wants
+ next_config_diff = {}
+ return next_config_diff
+ end
+
+ # instance is created on each thread
+ def initialize(task, schema, index)
+ super
+
+ if task['with_rehearsal'] and @index == 0
+ @bigquery = self.class.bigquery
+ @rehearsaled = false
+ @num_rows = 0
+ end
+
+ unless task['skip_file_generation']
+ @file_writer = FileWriter.new(task, schema, index, self.class.converters)
+ end
+ end
+
+ # called for each page in each thread
+ def close
+ end
+
+ # called for each page in each thread
+ def add(page)
+ if task['with_rehearsal'] and @index == 0 and !@rehearsaled
+ page = page.to_a # to avoid https://github.com/embulk/embulk/issues/403
+ if @num_rows > task['rehearsal_counts']
+ Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" }
+ begin
+ @bigquery.create_table(task['rehearsal_table'])
+ @bigquery.load(@file_writer.path, task['rehearsal_table'])
+ ensure
+ @bigquery.delete_table(task['rehearsal_table'])
+ end
+ @rehearsaled = true
+ end
+ @num_rows += page.to_a.size
+ end
+
+ unless task['skip_file_generation']
+ @file_writer.add(page)
+ end
+ end
+
+ def finish
+ end
+
+ def abort
+ end
+
+ # called after processing all pages in each thread, returns a task_report
+ def commit
+ unless task['skip_file_generation']
+ @file_writer.commit
+ else
+ {}
+ end
+ end
+ end
+ end
+end