lib/google/cloud/bigquery/service.rb in google-cloud-bigquery-1.1.0 vs lib/google/cloud/bigquery/service.rb in google-cloud-bigquery-1.2.0
- old
+ new
@@ -243,84 +243,79 @@
end
end
##
# Cancel the job specified by jobId.
- def cancel_job job_id
+ def cancel_job job_id, location: nil
# The BigQuery team has told us cancelling is considered idempotent
- execute(backoff: true) { service.cancel_job @project, job_id }
+ execute(backoff: true) do
+ service.cancel_job @project, job_id, location: location
+ end
end
##
# Returns the job specified by jobID.
- def get_job job_id
+ def get_job job_id, location: nil
# The get operation is considered idempotent
- execute(backoff: true) { service.get_job @project, job_id }
+ execute(backoff: true) do
+ service.get_job @project, job_id, location: location
+ end
end
- def insert_job config
+ def insert_job config, location: nil
job_object = API::Job.new(
- job_reference: job_ref_from(nil, nil),
+ job_reference: job_ref_from(nil, nil, location: location),
configuration: config
)
# Jobs have generated id, so this operation is considered idempotent
execute(backoff: true) { service.insert_job @project, job_object }
end
- def query_job query, options = {}
- config = query_table_config(query, options)
- # Jobs have generated id, so this operation is considered idempotent
- execute(backoff: true) { service.insert_job @project, config }
+ def query_job query_job_gapi
+ execute backoff: true do
+ service.insert_job @project, query_job_gapi
+ end
end
##
# Returns the query data for the job
def job_query_results job_id, options = {}
# The get operation is considered idempotent
execute backoff: true do
service.get_job_query_results @project,
job_id,
+ location: options.delete(:location),
max_results: options.delete(:max),
page_token: options.delete(:token),
start_index: options.delete(:start),
timeout_ms: options.delete(:timeout)
end
end
- def copy_table source, target, options = {}
- # Jobs have generated id, so this operation is considered idempotent
+ def copy_table copy_job_gapi
execute backoff: true do
- service.insert_job @project, copy_table_config(
- source, target, options
- )
+ service.insert_job @project, copy_job_gapi
end
end
- def extract_table table, storage_files, options = {}
- # Jobs have generated id, so this operation is considered idempotent
+ def extract_table extract_job_gapi
execute backoff: true do
- service.insert_job \
- @project, extract_table_config(table, storage_files, options)
+ service.insert_job @project, extract_job_gapi
end
end
- def load_table_gs_url dataset_id, table_id, url, options = {}
- # Jobs have generated id, so this operation is considered idempotent
+ def load_table_gs_url load_job_gapi
execute backoff: true do
- service.insert_job \
- @project, load_table_url_config(dataset_id, table_id,
- url, options)
+ service.insert_job @project, load_job_gapi
end
end
- def load_table_file dataset_id, table_id, file, options = {}
- # Jobs have generated id, so this operation is considered idempotent
+ def load_table_file file, load_job_gapi
execute backoff: true do
service.insert_job \
- @project, load_table_file_config(
- dataset_id, table_id, file, options
- ),
+ @project,
+ load_job_gapi,
upload_source: file, content_type: mime_type_for(file)
end
end
##
@@ -350,335 +345,57 @@
service.list_projects max_results: options[:max],
page_token: options[:token]
end
end
- def inspect
- "#{self.class}(#{@project})"
- end
-
- protected
-
- def table_ref_from tbl
- return nil if tbl.nil?
- API::TableReference.new(
- project_id: tbl.project_id,
- dataset_id: tbl.dataset_id,
- table_id: tbl.table_id
+ # If no job_id or prefix is given, always generate a client-side job ID
+ # anyway, for idempotent retry in the google-api-client layer.
+ # See https://cloud.google.com/bigquery/docs/managing-jobs#generate-jobid
+ def job_ref_from job_id, prefix, location: nil
+ prefix ||= "job_"
+ job_id ||= "#{prefix}#{generate_id}"
+ job_ref = API::JobReference.new(
+ project_id: @project,
+ job_id: job_id
)
+ # BigQuery does not allow nil location, but missing is ok.
+ job_ref.location = location if location
+ job_ref
end
+ # API object for dataset.
def dataset_ref_from dts, pjt = nil
return nil if dts.nil?
if dts.respond_to? :dataset_id
- API::DatasetReference.new(
+ Google::Apis::BigqueryV2::DatasetReference.new(
project_id: (pjt || dts.project_id || @project),
dataset_id: dts.dataset_id
)
else
- API::DatasetReference.new(
+ Google::Apis::BigqueryV2::DatasetReference.new(
project_id: (pjt || @project),
dataset_id: dts
)
end
end
+ def inspect
+ "#{self.class}(#{@project})"
+ end
+
+ protected
+
# Generate a random string similar to the BigQuery service job IDs.
def generate_id
SecureRandom.urlsafe_base64(21)
end
- # If no job_id or prefix is given, always generate a client-side job ID
- # anyway, for idempotent retry in the google-api-client layer.
- # See https://cloud.google.com/bigquery/docs/managing-jobs#generate-jobid
- def job_ref_from job_id, prefix
- prefix ||= "job_"
- job_id ||= "#{prefix}#{generate_id}"
- API::JobReference.new(
- project_id: @project,
- job_id: job_id
- )
- end
-
- def load_table_file_opts dataset_id, table_id, file, options = {}
- path = Pathname(file).to_path
- {
- destination_table: Google::Apis::BigqueryV2::TableReference.new(
- project_id: @project, dataset_id: dataset_id, table_id: table_id
- ),
- create_disposition: create_disposition(options[:create]),
- write_disposition: write_disposition(options[:write]),
- source_format: source_format(path, options[:format]),
- projection_fields: projection_fields(options[:projection_fields]),
- allow_jagged_rows: options[:jagged_rows],
- allow_quoted_newlines: options[:quoted_newlines],
- autodetect: options[:autodetect],
- encoding: options[:encoding], field_delimiter: options[:delimiter],
- ignore_unknown_values: options[:ignore_unknown],
- max_bad_records: options[:max_bad_records],
- null_marker: options[:null_marker], quote: options[:quote],
- schema: options[:schema], skip_leading_rows: options[:skip_leading]
- }.delete_if { |_, v| v.nil? }
- end
-
- def load_table_file_config dataset_id, table_id, file, options = {}
- load_opts = load_table_file_opts dataset_id, table_id, file, options
- req = API::Job.new(
- job_reference: job_ref_from(options[:job_id], options[:prefix]),
- configuration: API::JobConfiguration.new(
- load: API::JobConfigurationLoad.new(load_opts),
- dry_run: options[:dryrun]
- )
- )
- req.configuration.labels = options[:labels] if options[:labels]
- req
- end
-
- def load_table_url_opts dataset_id, table_id, url, options = {}
- {
- destination_table: Google::Apis::BigqueryV2::TableReference.new(
- project_id: @project, dataset_id: dataset_id, table_id: table_id
- ),
- source_uris: Array(url),
- create_disposition: create_disposition(options[:create]),
- write_disposition: write_disposition(options[:write]),
- source_format: source_format(url, options[:format]),
- projection_fields: projection_fields(options[:projection_fields]),
- allow_jagged_rows: options[:jagged_rows],
- allow_quoted_newlines: options[:quoted_newlines],
- autodetect: options[:autodetect],
- encoding: options[:encoding], field_delimiter: options[:delimiter],
- ignore_unknown_values: options[:ignore_unknown],
- max_bad_records: options[:max_bad_records],
- null_marker: options[:null_marker], quote: options[:quote],
- schema: options[:schema], skip_leading_rows: options[:skip_leading]
- }.delete_if { |_, v| v.nil? }
- end
-
- def load_table_url_config dataset_id, table_id, url, options = {}
- load_opts = load_table_url_opts dataset_id, table_id, url, options
- req = API::Job.new(
- job_reference: job_ref_from(options[:job_id], options[:prefix]),
- configuration: API::JobConfiguration.new(
- load: API::JobConfigurationLoad.new(load_opts),
- dry_run: options[:dryrun]
- )
- )
- req.configuration.labels = options[:labels] if options[:labels]
- req
- end
-
- # rubocop:disable all
-
- ##
- # Job description for query job
- def query_table_config query, options
- dest_table = table_ref_from options[:table]
- dataset_config = dataset_ref_from options[:dataset], options[:project]
- req = API::Job.new(
- job_reference: job_ref_from(options[:job_id], options[:prefix]),
- configuration: API::JobConfiguration.new(
- query: API::JobConfigurationQuery.new(
- query: query,
- # tableDefinitions: { ... },
- priority: priority_value(options[:priority]),
- use_query_cache: options[:cache],
- destination_table: dest_table,
- create_disposition: create_disposition(options[:create]),
- write_disposition: write_disposition(options[:write]),
- allow_large_results: options[:large_results],
- flatten_results: options[:flatten],
- default_dataset: dataset_config,
- use_legacy_sql: Convert.resolve_legacy_sql(
- options[:standard_sql], options[:legacy_sql]),
- maximum_billing_tier: options[:maximum_billing_tier],
- maximum_bytes_billed: options[:maximum_bytes_billed],
- user_defined_function_resources: udfs(options[:udfs])
- )
- )
- )
- req.configuration.labels = options[:labels] if options[:labels]
-
- if options[:params]
- if Array === options[:params]
- req.configuration.query.use_legacy_sql = false
- req.configuration.query.parameter_mode = "POSITIONAL"
- req.configuration.query.query_parameters = options[:params].map do |param|
- Convert.to_query_param param
- end
- elsif Hash === options[:params]
- req.configuration.query.use_legacy_sql = false
- req.configuration.query.parameter_mode = "NAMED"
- req.configuration.query.query_parameters = options[:params].map do |name, param|
- Convert.to_query_param(param).tap do |named_param|
- named_param.name = String name
- end
- end
- else
- raise "Query parameters must be an Array or a Hash."
- end
- end
-
- if options[:external]
- external_table_pairs = options[:external].map do |name, obj|
- [String(name), obj.to_gapi]
- end
- external_table_hash = Hash[external_table_pairs]
- req.configuration.query.table_definitions = external_table_hash
- end
-
- req
- end
-
- def query_config query, options = {}
- dataset_config = dataset_ref_from options[:dataset], options[:project]
-
- req = API::QueryRequest.new(
- query: query,
- max_results: options[:max],
- default_dataset: dataset_config,
- timeout_ms: options[:timeout],
- dry_run: options[:dryrun],
- use_query_cache: options[:cache],
- use_legacy_sql: Convert.resolve_legacy_sql(
- options[:standard_sql], options[:legacy_sql])
- )
-
- if options[:params]
- if Array === options[:params]
- req.use_legacy_sql = false
- req.parameter_mode = "POSITIONAL"
- req.query_parameters = options[:params].map do |param|
- Convert.to_query_param param
- end
- elsif Hash === options[:params]
- req.use_legacy_sql = false
- req.parameter_mode = "NAMED"
- req.query_parameters = options[:params].map do |name, param|
- Convert.to_query_param(param).tap do |named_param|
- named_param.name = String name
- end
- end
- else
- raise "Query parameters must be an Array or a Hash."
- end
- end
-
- req
- end
-
- # rubocop:enable all
-
- ##
- # Job description for copy job
- def copy_table_config source, target, options = {}
- req = API::Job.new(
- job_reference: job_ref_from(options[:job_id], options[:prefix]),
- configuration: API::JobConfiguration.new(
- copy: API::JobConfigurationTableCopy.new(
- source_table: source,
- destination_table: target,
- create_disposition: create_disposition(options[:create]),
- write_disposition: write_disposition(options[:write])
- ),
- dry_run: options[:dryrun]
- )
- )
- req.configuration.labels = options[:labels] if options[:labels]
- req
- end
-
- def extract_table_config table, storage_files, options = {}
- storage_urls = Array(storage_files).map do |url|
- url.respond_to?(:to_gs_url) ? url.to_gs_url : url
- end
- dest_format = source_format storage_urls.first, options[:format]
- req = API::Job.new(
- job_reference: job_ref_from(options[:job_id], options[:prefix]),
- configuration: API::JobConfiguration.new(
- extract: API::JobConfigurationExtract.new(
- destination_uris: Array(storage_urls),
- source_table: table,
- destination_format: dest_format,
- compression: options[:compression],
- field_delimiter: options[:delimiter],
- print_header: options[:header]
- ),
- dry_run: options[:dryrun]
- )
- )
- req.configuration.labels = options[:labels] if options[:labels]
- req
- end
-
- def create_disposition str
- { "create_if_needed" => "CREATE_IF_NEEDED",
- "createifneeded" => "CREATE_IF_NEEDED",
- "if_needed" => "CREATE_IF_NEEDED",
- "needed" => "CREATE_IF_NEEDED",
- "create_never" => "CREATE_NEVER",
- "createnever" => "CREATE_NEVER",
- "never" => "CREATE_NEVER" }[str.to_s.downcase]
- end
-
- def write_disposition str
- { "write_truncate" => "WRITE_TRUNCATE",
- "writetruncate" => "WRITE_TRUNCATE",
- "truncate" => "WRITE_TRUNCATE",
- "write_append" => "WRITE_APPEND",
- "writeappend" => "WRITE_APPEND",
- "append" => "WRITE_APPEND",
- "write_empty" => "WRITE_EMPTY",
- "writeempty" => "WRITE_EMPTY",
- "empty" => "WRITE_EMPTY" }[str.to_s.downcase]
- end
-
- def priority_value str
- { "batch" => "BATCH",
- "interactive" => "INTERACTIVE" }[str.to_s.downcase]
- end
-
- def source_format path, format
- val = {
- "csv" => "CSV",
- "json" => "NEWLINE_DELIMITED_JSON",
- "newline_delimited_json" => "NEWLINE_DELIMITED_JSON",
- "avro" => "AVRO",
- "datastore" => "DATASTORE_BACKUP",
- "backup" => "DATASTORE_BACKUP",
- "datastore_backup" => "DATASTORE_BACKUP"
- }[format.to_s.downcase]
- return val unless val.nil?
- return nil if path.nil?
- return "CSV" if path.end_with? ".csv"
- return "NEWLINE_DELIMITED_JSON" if path.end_with? ".json"
- return "AVRO" if path.end_with? ".avro"
- return "DATASTORE_BACKUP" if path.end_with? ".backup_info"
- nil
- end
-
- def projection_fields array_or_str
- Array(array_or_str) unless array_or_str.nil?
- end
-
def mime_type_for file
mime_type = MIME::Types.of(Pathname(file).to_path).first.to_s
return nil if mime_type.empty?
mime_type
rescue StandardError
nil
- end
-
- def udfs array_or_str
- Array(array_or_str).map do |uri_or_code|
- resource = API::UserDefinedFunctionResource.new
- if uri_or_code.start_with?("gs://")
- resource.resource_uri = uri_or_code
- else
- resource.inline_code = uri_or_code
- end
- resource
- end
end
def execute backoff: nil
if backoff
Backoff.new(retries: retries).execute { yield }