lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.3 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.3.4
- old
+ new
@@ -1,80 +1,26 @@
require 'google/apis/bigquery_v2'
-require 'google/api_client/auth/key_utils'
require 'json'
require 'thwait'
+require_relative 'google_client'
require_relative 'helper'
module Embulk
module Output
class Bigquery < OutputPlugin
- class Error < StandardError; end
- class JobTimeoutError < Error; end
- class NotFoundError < Error; end
-
- class BigqueryClient
+ class BigqueryClient < GoogleClient
def initialize(task, schema, fields = nil)
- @task = task
- @schema = schema
+ scope = "https://www.googleapis.com/auth/bigquery"
+ client_class = Google::Apis::BigqueryV2::BigqueryService
+ super(task, scope, client_class)
- @project = task['project']
- @dataset = task['dataset']
-
+ @schema = schema
reset_fields(fields) if fields
+ @project = @task['project']
+ @dataset = @task['dataset']
end
- def client
- return @cached_client if @cached_client && @cached_client_expiration > Time.now
-
- client = Google::Apis::BigqueryV2::BigqueryService.new
- client.client_options.application_name = @task['application_name']
- client.request_options.retries = @task['retries']
- client.request_options.timeout_sec = @task['timeout_sec']
- client.request_options.open_timeout_sec = @task['open_timeout_sec']
- Embulk.logger.debug { "embulk-output-bigquery: client_options: #{client.client_options.to_h}" }
- Embulk.logger.debug { "embulk-output-bigquery: request_options: #{client.request_options.to_h}" }
-
- scope = "https://www.googleapis.com/auth/bigquery"
-
- case @task['auth_method']
- when 'private_key'
- private_key_passphrase = 'notasecret'
- key = Google::APIClient::KeyUtils.load_from_pkcs12(@task['p12_keyfile'], private_key_passphrase)
- auth = Signet::OAuth2::Client.new(
- token_credential_uri: "https://accounts.google.com/o/oauth2/token",
- audience: "https://accounts.google.com/o/oauth2/token",
- scope: scope,
- issuer: @task['service_account_email'],
- signing_key: key)
-
- when 'compute_engine'
- auth = Google::Auth::GCECredentials.new
-
- when 'json_key'
- json_key = @task['json_keyfile']
- if File.exist?(json_key)
- auth = File.open(json_key) do |f|
- Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: scope)
- end
- else
- key = StringIO.new(json_key)
- auth = Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: scope)
- end
-
- when 'application_default'
- auth = Google::Auth.get_application_default([scope])
-
- else
- raise ConfigError, "Unknown auth method: #{@task['auth_method']}"
- end
-
- client.authorization = auth
-
- @cached_client_expiration = Time.now + 1800
- @cached_client = client
- end
-
def fields
return @fields if @fields
if @task['schema_file']
@fields = Helper.deep_symbolize_keys(JSON.parse(File.read(@task['schema_file'])))
elsif @task['template_table']
@@ -92,10 +38,66 @@
def reset_fields(fields = nil)
@fields = fields
self.fields
end
+ # @params gcs_patsh [Array] arary of gcs paths such as gs://bucket/path
+ # @return [Array] responses
+ def load_from_gcs(object_uris, table)
+ begin
+ # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
+ # we should generate job_id in client code, otherwise, retrying would cause duplication
+ if @task['prevent_duplicate_insert'] and (@task['mode'] == 'append' or @task['mode'] == 'append_direct')
+ job_id = Helper.create_load_job_id(@task, path, fields)
+ else
+ job_id = "embulk_load_job_#{SecureRandom.uuid}"
+ end
+ Embulk.logger.info { "embulk-output-bigquery: Load job starting... job_id:[#{job_id}] #{object_uris} => #{@project}:#{@dataset}.#{table}" }
+
+ body = {
+ job_reference: {
+ project_id: @project,
+ job_id: job_id,
+ },
+ configuration: {
+ load: {
+ destination_table: {
+ project_id: @project,
+ dataset_id: @dataset,
+ table_id: table,
+ },
+ schema: {
+ fields: fields,
+ },
+ write_disposition: 'WRITE_APPEND',
+ source_format: @task['source_format'],
+ max_bad_records: @task['max_bad_records'],
+ field_delimiter: @task['source_format'] == 'CSV' ? @task['field_delimiter'] : nil,
+ encoding: @task['encoding'],
+ ignore_unknown_values: @task['ignore_unknown_values'],
+ allow_quoted_newlines: @task['allow_quoted_newlines'],
+ source_uris: object_uris,
+ }
+ }
+ }
+ opts = {}
+
+ Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
+ response = client.insert_job(@project, body, opts)
+ unless @task['is_skip_job_result_check']
+ response = wait_load('Load', response)
+ end
+ [response]
+ rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
+ response = {status_code: e.status_code, message: e.message, error_class: e.class}
+ Embulk.logger.error {
+ "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
+ }
+ raise Error, "failed to load #{object_uris} to #{@project}:#{@dataset}.#{table}, response:#{response}"
+ end
+ end
+
def load_in_parallel(paths, table)
return [] if paths.empty?
# You may think as, load job is a background job, so sending requests in parallel
# does not improve performance. However, with actual experiments, this parallel
# loadings drastically shortened waiting time. It looks one jobs.insert takes about 50 sec.
@@ -116,10 +118,10 @@
[idx, response]
end
end
ThreadsWait.all_waits(*threads) do |th|
idx, response = th.value # raise errors occurred in threads
- responses[idx] = response
+ responses[idx] = response if idx
end
responses
end
def load(path, table)