lib/google/cloud/bigquery/service.rb in google-cloud-bigquery-1.1.0 vs lib/google/cloud/bigquery/service.rb in google-cloud-bigquery-1.2.0

- old
+ new

@@ -243,84 +243,79 @@ end end ## # Cancel the job specified by jobId. - def cancel_job job_id + def cancel_job job_id, location: nil # The BigQuery team has told us cancelling is considered idempotent - execute(backoff: true) { service.cancel_job @project, job_id } + execute(backoff: true) do + service.cancel_job @project, job_id, location: location + end end ## # Returns the job specified by jobID. - def get_job job_id + def get_job job_id, location: nil # The get operation is considered idempotent - execute(backoff: true) { service.get_job @project, job_id } + execute(backoff: true) do + service.get_job @project, job_id, location: location + end end - def insert_job config + def insert_job config, location: nil job_object = API::Job.new( - job_reference: job_ref_from(nil, nil), + job_reference: job_ref_from(nil, nil, location: location), configuration: config ) # Jobs have generated id, so this operation is considered idempotent execute(backoff: true) { service.insert_job @project, job_object } end - def query_job query, options = {} - config = query_table_config(query, options) - # Jobs have generated id, so this operation is considered idempotent - execute(backoff: true) { service.insert_job @project, config } + def query_job query_job_gapi + execute backoff: true do + service.insert_job @project, query_job_gapi + end end ## # Returns the query data for the job def job_query_results job_id, options = {} # The get operation is considered idempotent execute backoff: true do service.get_job_query_results @project, job_id, + location: options.delete(:location), max_results: options.delete(:max), page_token: options.delete(:token), start_index: options.delete(:start), timeout_ms: options.delete(:timeout) end end - def copy_table source, target, options = {} - # Jobs have generated id, so this operation is considered idempotent + def copy_table copy_job_gapi execute backoff: true do - service.insert_job @project, copy_table_config( - source, target, options - ) + service.insert_job @project, copy_job_gapi end end - def extract_table table, storage_files, options = {} - # Jobs have generated id, so this operation is considered idempotent + def extract_table extract_job_gapi execute backoff: true do - service.insert_job \ - @project, extract_table_config(table, storage_files, options) + service.insert_job @project, extract_job_gapi end end - def load_table_gs_url dataset_id, table_id, url, options = {} - # Jobs have generated id, so this operation is considered idempotent + def load_table_gs_url load_job_gapi execute backoff: true do - service.insert_job \ - @project, load_table_url_config(dataset_id, table_id, - url, options) + service.insert_job @project, load_job_gapi end end - def load_table_file dataset_id, table_id, file, options = {} - # Jobs have generated id, so this operation is considered idempotent + def load_table_file file, load_job_gapi execute backoff: true do service.insert_job \ - @project, load_table_file_config( - dataset_id, table_id, file, options - ), + @project, + load_job_gapi, upload_source: file, content_type: mime_type_for(file) end end ## @@ -350,335 +345,57 @@ service.list_projects max_results: options[:max], page_token: options[:token] end end - def inspect - "#{self.class}(#{@project})" - end - - protected - - def table_ref_from tbl - return nil if tbl.nil? - API::TableReference.new( - project_id: tbl.project_id, - dataset_id: tbl.dataset_id, - table_id: tbl.table_id + # If no job_id or prefix is given, always generate a client-side job ID + # anyway, for idempotent retry in the google-api-client layer. + # See https://cloud.google.com/bigquery/docs/managing-jobs#generate-jobid + def job_ref_from job_id, prefix, location: nil + prefix ||= "job_" + job_id ||= "#{prefix}#{generate_id}" + job_ref = API::JobReference.new( + project_id: @project, + job_id: job_id ) + # BigQuery does not allow nil location, but missing is ok. + job_ref.location = location if location + job_ref end + # API object for dataset. def dataset_ref_from dts, pjt = nil return nil if dts.nil? if dts.respond_to? :dataset_id - API::DatasetReference.new( + Google::Apis::BigqueryV2::DatasetReference.new( project_id: (pjt || dts.project_id || @project), dataset_id: dts.dataset_id ) else - API::DatasetReference.new( + Google::Apis::BigqueryV2::DatasetReference.new( project_id: (pjt || @project), dataset_id: dts ) end end + def inspect + "#{self.class}(#{@project})" + end + + protected + # Generate a random string similar to the BigQuery service job IDs. def generate_id SecureRandom.urlsafe_base64(21) end - # If no job_id or prefix is given, always generate a client-side job ID - # anyway, for idempotent retry in the google-api-client layer. - # See https://cloud.google.com/bigquery/docs/managing-jobs#generate-jobid - def job_ref_from job_id, prefix - prefix ||= "job_" - job_id ||= "#{prefix}#{generate_id}" - API::JobReference.new( - project_id: @project, - job_id: job_id - ) - end - - def load_table_file_opts dataset_id, table_id, file, options = {} - path = Pathname(file).to_path - { - destination_table: Google::Apis::BigqueryV2::TableReference.new( - project_id: @project, dataset_id: dataset_id, table_id: table_id - ), - create_disposition: create_disposition(options[:create]), - write_disposition: write_disposition(options[:write]), - source_format: source_format(path, options[:format]), - projection_fields: projection_fields(options[:projection_fields]), - allow_jagged_rows: options[:jagged_rows], - allow_quoted_newlines: options[:quoted_newlines], - autodetect: options[:autodetect], - encoding: options[:encoding], field_delimiter: options[:delimiter], - ignore_unknown_values: options[:ignore_unknown], - max_bad_records: options[:max_bad_records], - null_marker: options[:null_marker], quote: options[:quote], - schema: options[:schema], skip_leading_rows: options[:skip_leading] - }.delete_if { |_, v| v.nil? } - end - - def load_table_file_config dataset_id, table_id, file, options = {} - load_opts = load_table_file_opts dataset_id, table_id, file, options - req = API::Job.new( - job_reference: job_ref_from(options[:job_id], options[:prefix]), - configuration: API::JobConfiguration.new( - load: API::JobConfigurationLoad.new(load_opts), - dry_run: options[:dryrun] - ) - ) - req.configuration.labels = options[:labels] if options[:labels] - req - end - - def load_table_url_opts dataset_id, table_id, url, options = {} - { - destination_table: Google::Apis::BigqueryV2::TableReference.new( - project_id: @project, dataset_id: dataset_id, table_id: table_id - ), - source_uris: Array(url), - create_disposition: create_disposition(options[:create]), - write_disposition: write_disposition(options[:write]), - source_format: source_format(url, options[:format]), - projection_fields: projection_fields(options[:projection_fields]), - allow_jagged_rows: options[:jagged_rows], - allow_quoted_newlines: options[:quoted_newlines], - autodetect: options[:autodetect], - encoding: options[:encoding], field_delimiter: options[:delimiter], - ignore_unknown_values: options[:ignore_unknown], - max_bad_records: options[:max_bad_records], - null_marker: options[:null_marker], quote: options[:quote], - schema: options[:schema], skip_leading_rows: options[:skip_leading] - }.delete_if { |_, v| v.nil? } - end - - def load_table_url_config dataset_id, table_id, url, options = {} - load_opts = load_table_url_opts dataset_id, table_id, url, options - req = API::Job.new( - job_reference: job_ref_from(options[:job_id], options[:prefix]), - configuration: API::JobConfiguration.new( - load: API::JobConfigurationLoad.new(load_opts), - dry_run: options[:dryrun] - ) - ) - req.configuration.labels = options[:labels] if options[:labels] - req - end - - # rubocop:disable all - - ## - # Job description for query job - def query_table_config query, options - dest_table = table_ref_from options[:table] - dataset_config = dataset_ref_from options[:dataset], options[:project] - req = API::Job.new( - job_reference: job_ref_from(options[:job_id], options[:prefix]), - configuration: API::JobConfiguration.new( - query: API::JobConfigurationQuery.new( - query: query, - # tableDefinitions: { ... }, - priority: priority_value(options[:priority]), - use_query_cache: options[:cache], - destination_table: dest_table, - create_disposition: create_disposition(options[:create]), - write_disposition: write_disposition(options[:write]), - allow_large_results: options[:large_results], - flatten_results: options[:flatten], - default_dataset: dataset_config, - use_legacy_sql: Convert.resolve_legacy_sql( - options[:standard_sql], options[:legacy_sql]), - maximum_billing_tier: options[:maximum_billing_tier], - maximum_bytes_billed: options[:maximum_bytes_billed], - user_defined_function_resources: udfs(options[:udfs]) - ) - ) - ) - req.configuration.labels = options[:labels] if options[:labels] - - if options[:params] - if Array === options[:params] - req.configuration.query.use_legacy_sql = false - req.configuration.query.parameter_mode = "POSITIONAL" - req.configuration.query.query_parameters = options[:params].map do |param| - Convert.to_query_param param - end - elsif Hash === options[:params] - req.configuration.query.use_legacy_sql = false - req.configuration.query.parameter_mode = "NAMED" - req.configuration.query.query_parameters = options[:params].map do |name, param| - Convert.to_query_param(param).tap do |named_param| - named_param.name = String name - end - end - else - raise "Query parameters must be an Array or a Hash." - end - end - - if options[:external] - external_table_pairs = options[:external].map do |name, obj| - [String(name), obj.to_gapi] - end - external_table_hash = Hash[external_table_pairs] - req.configuration.query.table_definitions = external_table_hash - end - - req - end - - def query_config query, options = {} - dataset_config = dataset_ref_from options[:dataset], options[:project] - - req = API::QueryRequest.new( - query: query, - max_results: options[:max], - default_dataset: dataset_config, - timeout_ms: options[:timeout], - dry_run: options[:dryrun], - use_query_cache: options[:cache], - use_legacy_sql: Convert.resolve_legacy_sql( - options[:standard_sql], options[:legacy_sql]) - ) - - if options[:params] - if Array === options[:params] - req.use_legacy_sql = false - req.parameter_mode = "POSITIONAL" - req.query_parameters = options[:params].map do |param| - Convert.to_query_param param - end - elsif Hash === options[:params] - req.use_legacy_sql = false - req.parameter_mode = "NAMED" - req.query_parameters = options[:params].map do |name, param| - Convert.to_query_param(param).tap do |named_param| - named_param.name = String name - end - end - else - raise "Query parameters must be an Array or a Hash." - end - end - - req - end - - # rubocop:enable all - - ## - # Job description for copy job - def copy_table_config source, target, options = {} - req = API::Job.new( - job_reference: job_ref_from(options[:job_id], options[:prefix]), - configuration: API::JobConfiguration.new( - copy: API::JobConfigurationTableCopy.new( - source_table: source, - destination_table: target, - create_disposition: create_disposition(options[:create]), - write_disposition: write_disposition(options[:write]) - ), - dry_run: options[:dryrun] - ) - ) - req.configuration.labels = options[:labels] if options[:labels] - req - end - - def extract_table_config table, storage_files, options = {} - storage_urls = Array(storage_files).map do |url| - url.respond_to?(:to_gs_url) ? url.to_gs_url : url - end - dest_format = source_format storage_urls.first, options[:format] - req = API::Job.new( - job_reference: job_ref_from(options[:job_id], options[:prefix]), - configuration: API::JobConfiguration.new( - extract: API::JobConfigurationExtract.new( - destination_uris: Array(storage_urls), - source_table: table, - destination_format: dest_format, - compression: options[:compression], - field_delimiter: options[:delimiter], - print_header: options[:header] - ), - dry_run: options[:dryrun] - ) - ) - req.configuration.labels = options[:labels] if options[:labels] - req - end - - def create_disposition str - { "create_if_needed" => "CREATE_IF_NEEDED", - "createifneeded" => "CREATE_IF_NEEDED", - "if_needed" => "CREATE_IF_NEEDED", - "needed" => "CREATE_IF_NEEDED", - "create_never" => "CREATE_NEVER", - "createnever" => "CREATE_NEVER", - "never" => "CREATE_NEVER" }[str.to_s.downcase] - end - - def write_disposition str - { "write_truncate" => "WRITE_TRUNCATE", - "writetruncate" => "WRITE_TRUNCATE", - "truncate" => "WRITE_TRUNCATE", - "write_append" => "WRITE_APPEND", - "writeappend" => "WRITE_APPEND", - "append" => "WRITE_APPEND", - "write_empty" => "WRITE_EMPTY", - "writeempty" => "WRITE_EMPTY", - "empty" => "WRITE_EMPTY" }[str.to_s.downcase] - end - - def priority_value str - { "batch" => "BATCH", - "interactive" => "INTERACTIVE" }[str.to_s.downcase] - end - - def source_format path, format - val = { - "csv" => "CSV", - "json" => "NEWLINE_DELIMITED_JSON", - "newline_delimited_json" => "NEWLINE_DELIMITED_JSON", - "avro" => "AVRO", - "datastore" => "DATASTORE_BACKUP", - "backup" => "DATASTORE_BACKUP", - "datastore_backup" => "DATASTORE_BACKUP" - }[format.to_s.downcase] - return val unless val.nil? - return nil if path.nil? - return "CSV" if path.end_with? ".csv" - return "NEWLINE_DELIMITED_JSON" if path.end_with? ".json" - return "AVRO" if path.end_with? ".avro" - return "DATASTORE_BACKUP" if path.end_with? ".backup_info" - nil - end - - def projection_fields array_or_str - Array(array_or_str) unless array_or_str.nil? - end - def mime_type_for file mime_type = MIME::Types.of(Pathname(file).to_path).first.to_s return nil if mime_type.empty? mime_type rescue StandardError nil - end - - def udfs array_or_str - Array(array_or_str).map do |uri_or_code| - resource = API::UserDefinedFunctionResource.new - if uri_or_code.start_with?("gs://") - resource.resource_uri = uri_or_code - else - resource.inline_code = uri_or_code - end - resource - end end def execute backoff: nil if backoff Backoff.new(retries: retries).execute { yield }