lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.3 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.4

- old
+ new

@@ -1,80 +1,26 @@ require 'google/apis/bigquery_v2' -require 'google/api_client/auth/key_utils' require 'json' require 'thwait' +require_relative 'google_client' require_relative 'helper' module Embulk module Output class Bigquery < OutputPlugin - class Error < StandardError; end - class JobTimeoutError < Error; end - class NotFoundError < Error; end - - class BigqueryClient + class BigqueryClient < GoogleClient def initialize(task, schema, fields = nil) - @task = task - @schema = schema + scope = "https://www.googleapis.com/auth/bigquery" + client_class = Google::Apis::BigqueryV2::BigqueryService + super(task, scope, client_class) - @project = task['project'] - @dataset = task['dataset'] - + @schema = schema reset_fields(fields) if fields + @project = @task['project'] + @dataset = @task['dataset'] end - def client - return @cached_client if @cached_client && @cached_client_expiration > Time.now - - client = Google::Apis::BigqueryV2::BigqueryService.new - client.client_options.application_name = @task['application_name'] - client.request_options.retries = @task['retries'] - client.request_options.timeout_sec = @task['timeout_sec'] - client.request_options.open_timeout_sec = @task['open_timeout_sec'] - Embulk.logger.debug { "embulk-output-bigquery: client_options: #{client.client_options.to_h}" } - Embulk.logger.debug { "embulk-output-bigquery: request_options: #{client.request_options.to_h}" } - - scope = "https://www.googleapis.com/auth/bigquery" - - case @task['auth_method'] - when 'private_key' - private_key_passphrase = 'notasecret' - key = Google::APIClient::KeyUtils.load_from_pkcs12(@task['p12_keyfile'], private_key_passphrase) - auth = Signet::OAuth2::Client.new( - token_credential_uri: "https://accounts.google.com/o/oauth2/token", - audience: "https://accounts.google.com/o/oauth2/token", - scope: scope, - issuer: @task['service_account_email'], - signing_key: key) - - when 'compute_engine' - auth = Google::Auth::GCECredentials.new - - when 'json_key' - json_key = @task['json_keyfile'] - if File.exist?(json_key) - auth = File.open(json_key) do |f| - Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: scope) - end - else - key = StringIO.new(json_key) - auth = Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: scope) - end - - when 'application_default' - auth = Google::Auth.get_application_default([scope]) - - else - raise ConfigError, "Unknown auth method: #{@task['auth_method']}" - end - - client.authorization = auth - - @cached_client_expiration = Time.now + 1800 - @cached_client = client - end - def fields return @fields if @fields if @task['schema_file'] @fields = Helper.deep_symbolize_keys(JSON.parse(File.read(@task['schema_file']))) elsif @task['template_table'] @@ -92,10 +38,66 @@ def reset_fields(fields = nil) @fields = fields self.fields end + # @params gcs_patsh [Array] arary of gcs paths such as gs://bucket/path + # @return [Array] responses + def load_from_gcs(object_uris, table) + begin + # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says, + # we should generate job_id in client code, otherwise, retrying would cause duplication + if @task['prevent_duplicate_insert'] and (@task['mode'] == 'append' or @task['mode'] == 'append_direct') + job_id = Helper.create_load_job_id(@task, path, fields) + else + job_id = "embulk_load_job_#{SecureRandom.uuid}" + end + Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@project}:#{@dataset}.#{table}" } + + body = { + job_reference: { + project_id: @project, + job_id: job_id, + }, + configuration: { + load: { + destination_table: { + project_id: @project, + dataset_id: @dataset, + table_id: table, + }, + schema: { + fields: fields, + }, + write_disposition: 'WRITE_APPEND', + source_format: @task['source_format'], + max_bad_records: @task['max_bad_records'], + field_delimiter: @task['source_format'] == 'CSV' ? @task['field_delimiter'] : nil, + encoding: @task['encoding'], + ignore_unknown_values: @task['ignore_unknown_values'], + allow_quoted_newlines: @task['allow_quoted_newlines'], + source_uris: object_uris, + } + } + } + opts = {} + + Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } + response = client.insert_job(@project, body, opts) + unless @task['is_skip_job_result_check'] + response = wait_load('Load', response) + end + [response] + rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + response = {status_code: e.status_code, message: e.message, error_class: e.class} + Embulk.logger.error { + "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}" + } + raise Error, "failed to load #{object_uris} to #{@project}:#{@dataset}.#{table}, response:#{response}" + end + end + def load_in_parallel(paths, table) return [] if paths.empty? # You may think as, load job is a background job, so sending requests in parallel # does not improve performance. However, with actual experiments, this parallel # loadings drastically shortened waiting time. It looks one jobs.insert takes about 50 sec. @@ -116,10 +118,10 @@ [idx, response] end end ThreadsWait.all_waits(*threads) do |th| idx, response = th.value # raise errors occurred in threads - responses[idx] = response + responses[idx] = response if idx end responses end def load(path, table)