lib/google/cloud/bigquery/dataset.rb in google-cloud-bigquery-1.1.0 vs lib/google/cloud/bigquery/dataset.rb in google-cloud-bigquery-1.2.0

- old
+ new

@@ -264,12 +264,12 @@ ## # The geographic location where the dataset should reside. Possible # values include `EU` and `US`. The default value is `US`. # - # @return [String, nil] The location code, or `nil` if the object is a - # reference (see {#reference?}). + # @return [String, nil] The geographic location, or `nil` if the object + # is a reference (see {#reference?}). # # @!group Attributes # def location return nil if reference? @@ -694,10 +694,16 @@ # | `STRUCT` | `Hash` | Hash keys may be strings or symbols. | # # See [Data Types](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types) # for an overview of each BigQuery data type, including allowed values. # + # The geographic location for the job ("US", "EU", etc.) can be set via + # {QueryJob::Updater#location=} in a block passed to this method. If the + # dataset is a full resource representation (see {#resource_full?}), the + # location of the job will be automatically set to the location of the + # dataset. + # # @param [String] query A query string, following the BigQuery [query # syntax](https://cloud.google.com/bigquery/query-reference), of the # query to execute. Example: "SELECT count(f1) FROM # [myProjectId:myDatasetId.myTableId]". # @param [Array, Hash] params Standard SQL only. Used to pass query @@ -759,15 +765,10 @@ # to be set. # @param [Boolean] flatten This option is specific to Legacy SQL. # Flattens all nested and repeated fields in the query results. The # default value is `true`. `large_results` parameter must be `true` if # this is set to `false`. - # @param [Integer] maximum_billing_tier Limits the billing tier for this - # job. Queries that have resource usage beyond this tier will fail - # (without incurring a charge). Optional. If unspecified, this will be - # set to your project default. For more information, see [High-Compute - # queries](https://cloud.google.com/bigquery/pricing#high-compute). # @param [Integer] maximum_bytes_billed Limits the bytes billed for this # job. Queries that will have bytes billed beyond this limit will fail # (without incurring a charge). Optional. If unspecified, this will be # set to your project default. # @param [String] job_id A user-defined ID for the query job. The ID @@ -797,10 +798,15 @@ # Google Cloud Storage URI (`gs://bucket/path`), or an inline resource # that contains code for a user-defined function (UDF). Providing an # inline code resource is equivalent to providing a URI for a file # containing the same code. See [User-Defined # Functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions). + # @param [Integer] maximum_billing_tier Deprecated: Change the billing + # tier to allow high-compute queries. + # @yield [job] a job configuration object + # @yieldparam [Google::Cloud::Bigquery::QueryJob::Updater] job a job + # configuration object for setting additional options for the query. # # @return [Google::Cloud::Bigquery::QueryJob] A new query job object. # # @example Query using standard SQL: # require "google/cloud/bigquery" @@ -863,11 +869,11 @@ # job.data.each do |row| # puts row[:name] # end # end # - # @example Query using external data source: + # @example Query using external data source, set destination: # require "google/cloud/bigquery" # # bigquery = Google::Cloud::Bigquery.new # dataset = bigquery.dataset "my_dataset" # @@ -875,12 +881,14 @@ # csv_table = dataset.external csv_url do |csv| # csv.autodetect = true # csv.skip_leading_rows = 1 # end # - # job = dataset.query_job "SELECT * FROM my_ext_table", - # external: { my_ext_table: csv_table } + # job = dataset.query_job "SELECT * FROM my_ext_table" do |query| + # query.external = { my_ext_table: csv_table } + # query.table = dataset.table "my_table", skip_lookup: true + # end # # job.wait_until_done! # if !job.failed? # job.data.each do |row| # puts row[:name] @@ -893,21 +901,27 @@ priority: "INTERACTIVE", cache: true, table: nil, create: nil, write: nil, standard_sql: nil, legacy_sql: nil, large_results: nil, flatten: nil, maximum_billing_tier: nil, maximum_bytes_billed: nil, job_id: nil, prefix: nil, labels: nil, udfs: nil + ensure_service! options = { priority: priority, cache: cache, table: table, create: create, write: write, large_results: large_results, flatten: flatten, legacy_sql: legacy_sql, standard_sql: standard_sql, maximum_billing_tier: maximum_billing_tier, maximum_bytes_billed: maximum_bytes_billed, - params: params, external: external, labels: labels, - job_id: job_id, prefix: prefix, udfs: udfs } - options[:dataset] ||= self - ensure_service! - gapi = service.query_job query, options + job_id: job_id, prefix: prefix, params: params, + external: external, labels: labels, udfs: udfs } + + updater = QueryJob::Updater.from_options service, query, options + updater.dataset = self + updater.location = location if location # may be dataset reference + + yield updater if block_given? + + gapi = service.query_job updater.to_gapi Job.from_gapi gapi, service end ## # Queries data and waits for the results. In this method, a {QueryJob} @@ -936,10 +950,16 @@ # | `STRUCT` | `Hash` | Hash keys may be strings or symbols. | # # See [Data Types](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types) # for an overview of each BigQuery data type, including allowed values. # + # The geographic location for the job ("US", "EU", etc.) can be set via + # {QueryJob::Updater#location=} in a block passed to this method. If the + # dataset is a full resource representation (see {#resource_full?}), the + # location of the job will be automatically set to the location of the + # dataset. + # # @see https://cloud.google.com/bigquery/querying-data Querying Data # # @param [String] query A query string, following the BigQuery [query # syntax](https://cloud.google.com/bigquery/query-reference), of the # query to execute. Example: "SELECT count(f1) FROM @@ -983,10 +1003,13 @@ # BigQuery's [standard # SQL](https://cloud.google.com/bigquery/docs/reference/standard-sql/) # When set to false, the values of `large_results` and `flatten` are # ignored; the query will be run as if `large_results` is true and # `flatten` is false. Optional. The default value is false. + # @yield [job] a job configuration object + # @yieldparam [Google::Cloud::Bigquery::QueryJob::Updater] job a job + # configuration object for setting additional options for the query. # # @return [Google::Cloud::Bigquery::Data] A new data object. # # @example Query using standard SQL: # require "google/cloud/bigquery" @@ -1037,11 +1060,11 @@ # # data.each do |row| # puts row[:name] # end # - # @example Query using external data source: + # @example Query using external data source, set destination: # require "google/cloud/bigquery" # # bigquery = Google::Cloud::Bigquery.new # dataset = bigquery.dataset "my_dataset" # @@ -1049,38 +1072,38 @@ # csv_table = dataset.external csv_url do |csv| # csv.autodetect = true # csv.skip_leading_rows = 1 # end # - # data = dataset.query "SELECT * FROM my_ext_table", - # external: { my_ext_table: csv_table } + # data = dataset.query "SELECT * FROM my_ext_table" do |query| + # query.external = { my_ext_table: csv_table } + # query.table = dataset.table "my_table", skip_lookup: true + # end # # data.each do |row| # puts row[:name] # end # # @!group Data # def query query, params: nil, external: nil, max: nil, cache: true, standard_sql: nil, legacy_sql: nil ensure_service! - options = { params: params, external: external, cache: cache, - legacy_sql: legacy_sql, standard_sql: standard_sql } + options = { priority: "INTERACTIVE", external: external, cache: cache, + legacy_sql: legacy_sql, standard_sql: standard_sql, + params: params } + options[:dataset] ||= self + updater = QueryJob::Updater.from_options service, query, options + updater.location = location if location # may be dataset reference - job = query_job query, options + yield updater if block_given? + + gapi = service.query_job updater.to_gapi + job = Job.from_gapi gapi, service job.wait_until_done! + ensure_job_succeeded! job - if job.failed? - begin - # raise to activate ruby exception cause handling - raise job.gapi_error - rescue StandardError => e - # wrap Google::Apis::Error with Google::Cloud::Error - raise Google::Cloud::Error.from_error(e) - end - end - job.data max: max end ## # Creates a new External::DataSource (or subclass) object that @@ -1145,14 +1168,21 @@ # For the source of the data, you can pass a google-cloud storage file # path or a google-cloud-storage `File` instance. Or, you can upload a # file directly. See [Loading Data with a POST # Request](https://cloud.google.com/bigquery/loading-data-post-request#multipart). # + # The geographic location for the job ("US", "EU", etc.) can be set via + # {LoadJob::Updater#location=} in a block passed to this method. If the + # dataset is a full resource representation (see {#resource_full?}), the + # location of the job will be automatically set to the location of the + # dataset. + # # @param [String] table_id The destination table to load the data into. - # @param [File, Google::Cloud::Storage::File, String, URI] file A file - # or the URI of a Google Cloud Storage file containing data to load - # into the table. + # @param [File, Google::Cloud::Storage::File, String, URI, + # Array<Google::Cloud::Storage::File, String, URI>] files + # A file or the URI of a Google Cloud Storage file, or an Array of + # those, containing data to load into the table. # @param [String] format The exported file format. The default value is # `csv`. # # The following values are supported: # @@ -1267,17 +1297,16 @@ # contain lowercase letters, numeric characters, underscores and # dashes. International characters are allowed. Label values are # optional. Label keys must start with a letter and each label in the # list must have a different key. # - # @yield [schema] A block for setting the schema for the destination - # table. The schema can be omitted if the destination table already - # exists, or if you're loading data from a Google Cloud Datastore - # backup. - # @yieldparam [Google::Cloud::Bigquery::Schema] schema The schema - # instance provided using the `schema` option, or a new, empty schema - # instance + # @yield [updater] A block for setting the schema and other + # options for the destination table. The schema can be omitted if the + # destination table already exists, or if you're loading data from a + # Google Cloud Datastore backup. + # @yieldparam [Google::Cloud::Bigquery::LoadJob::Updater] updater An + # updater to modify the load job and its schema. # # @return [Google::Cloud::Bigquery::LoadJob] A new load job object. # # @example # require "google/cloud/bigquery" @@ -1310,10 +1339,29 @@ # nested_schema.string "place", mode: :required # nested_schema.integer "number_of_years", mode: :required # end # end # + # @example Pass a list of google-cloud-storage files: + # require "google/cloud/bigquery" + # require "google/cloud/storage" + # + # bigquery = Google::Cloud::Bigquery.new + # dataset = bigquery.dataset "my_dataset" + # + # storage = Google::Cloud::Storage.new + # bucket = storage.bucket "my-bucket" + # file = bucket.file "file-name.csv" + # list = [file, "gs://my-bucket/file-name2.csv"] + # load_job = dataset.load_job "my_new_table", list do |schema| + # schema.string "first_name", mode: :required + # schema.record "cities_lived", mode: :repeated do |nested_schema| + # nested_schema.string "place", mode: :required + # nested_schema.integer "number_of_years", mode: :required + # end + # end + # # @example Upload a file directly: # require "google/cloud/bigquery" # # bigquery = Google::Cloud::Bigquery.new # dataset = bigquery.dataset "my_dataset" @@ -1331,43 +1379,45 @@ # require "google/cloud/bigquery" # # bigquery = Google::Cloud::Bigquery.new # dataset = bigquery.dataset "my_dataset" # - # load_job = dataset.load_job "my_new_table", - # "gs://my-bucket/xxxx.kind_name.backup_info", - # format: "datastore_backup" + # load_job = dataset.load_job( + # "my_new_table", + # "gs://my-bucket/xxxx.kind_name.backup_info") do |j| + # j.format = "datastore_backup" + # end # # @!group Data # - def load_job table_id, file, format: nil, create: nil, write: nil, + def load_job table_id, files, format: nil, create: nil, write: nil, projection_fields: nil, jagged_rows: nil, quoted_newlines: nil, encoding: nil, delimiter: nil, ignore_unknown: nil, max_bad_records: nil, quote: nil, skip_leading: nil, dryrun: nil, schema: nil, job_id: nil, prefix: nil, labels: nil, autodetect: nil, null_marker: nil ensure_service! - if block_given? - schema ||= Schema.from_gapi - yield schema - end - schema_gapi = schema.to_gapi if schema + updater = load_job_updater table_id, + format: format, create: create, + write: write, + projection_fields: projection_fields, + jagged_rows: jagged_rows, + quoted_newlines: quoted_newlines, + encoding: encoding, + delimiter: delimiter, + ignore_unknown: ignore_unknown, + max_bad_records: max_bad_records, + quote: quote, skip_leading: skip_leading, + dryrun: dryrun, schema: schema, + job_id: job_id, prefix: prefix, + labels: labels, autodetect: autodetect, + null_marker: null_marker - options = { format: format, create: create, write: write, - projection_fields: projection_fields, - jagged_rows: jagged_rows, - quoted_newlines: quoted_newlines, encoding: encoding, - delimiter: delimiter, ignore_unknown: ignore_unknown, - max_bad_records: max_bad_records, quote: quote, - skip_leading: skip_leading, dryrun: dryrun, - schema: schema_gapi, job_id: job_id, prefix: prefix, - labels: labels, autodetect: autodetect, - null_marker: null_marker } - return load_storage(table_id, file, options) if storage_url? file - return load_local(table_id, file, options) if local_file? file - raise Google::Cloud::Error, "Don't know how to load #{file}" + yield updater if block_given? + + load_local_or_uri files, updater end ## # Loads data into the provided destination table using a synchronous # method that blocks for a response. Timeouts and transient errors are @@ -1377,14 +1427,21 @@ # For the source of the data, you can pass a google-cloud storage file # path or a google-cloud-storage `File` instance. Or, you can upload a # file directly. See [Loading Data with a POST # Request](https://cloud.google.com/bigquery/loading-data-post-request#multipart). # + # The geographic location for the job ("US", "EU", etc.) can be set via + # {LoadJob::Updater#location=} in a block passed to this method. If the + # dataset is a full resource representation (see {#resource_full?}), the + # location of the job will be automatically set to the location of the + # dataset. + # # @param [String] table_id The destination table to load the data into. - # @param [File, Google::Cloud::Storage::File, String, URI] file A file - # or the URI of a Google Cloud Storage file containing data to load - # into the table. + # @param [File, Google::Cloud::Storage::File, String, URI, + # Array<Google::Cloud::Storage::File, String, URI>] files + # A file or the URI of a Google Cloud Storage file, or an Array of + # those, containing data to load into the table. # @param [String] format The exported file format. The default value is # `csv`. # # The following values are supported: # @@ -1477,17 +1534,16 @@ # # See {Project#schema} for the creation of the schema for use with # this option. Also note that for most use cases, the block yielded by # this method is a more convenient way to configure the schema. # - # @yield [schema] A block for setting the schema for the destination - # table. The schema can be omitted if the destination table already - # exists, or if you're loading data from a Google Cloud Datastore - # backup. - # @yieldparam [Google::Cloud::Bigquery::Schema] schema The schema - # instance provided using the `schema` option, or a new, empty schema - # instance + # @yield [updater] A block for setting the schema of the destination + # table and other options for the load job. The schema can be omitted + # if the destination table already exists, or if you're loading data + # from a Google Cloud Datastore backup. + # @yieldparam [Google::Cloud::Bigquery::LoadJob::Updater] updater An + # updater to modify the load job and its schema. # # @return [Boolean] Returns `true` if the load job was successful. # # @example # require "google/cloud/bigquery" @@ -1520,10 +1576,29 @@ # nested_schema.string "place", mode: :required # nested_schema.integer "number_of_years", mode: :required # end # end # + # @example Pass a list of google-cloud-storage files: + # require "google/cloud/bigquery" + # require "google/cloud/storage" + # + # bigquery = Google::Cloud::Bigquery.new + # dataset = bigquery.dataset "my_dataset" + # + # storage = Google::Cloud::Storage.new + # bucket = storage.bucket "my-bucket" + # file = bucket.file "file-name.csv" + # list = [file, "gs://my-bucket/file-name2.csv"] + # dataset.load "my_new_table", list do |schema| + # schema.string "first_name", mode: :required + # schema.record "cities_lived", mode: :repeated do |nested_schema| + # nested_schema.string "place", mode: :required + # nested_schema.integer "number_of_years", mode: :required + # end + # end + # # @example Upload a file directly: # require "google/cloud/bigquery" # # bigquery = Google::Cloud::Bigquery.new # dataset = bigquery.dataset "my_dataset" @@ -1542,45 +1617,43 @@ # # bigquery = Google::Cloud::Bigquery.new # dataset = bigquery.dataset "my_dataset" # # dataset.load "my_new_table", - # "gs://my-bucket/xxxx.kind_name.backup_info", - # format: "datastore_backup" + # "gs://my-bucket/xxxx.kind_name.backup_info" do |j| + # j.format = "datastore_backup" + # end # # @!group Data # - def load table_id, file, format: nil, create: nil, write: nil, + def load table_id, files, format: nil, create: nil, write: nil, projection_fields: nil, jagged_rows: nil, quoted_newlines: nil, encoding: nil, delimiter: nil, ignore_unknown: nil, max_bad_records: nil, quote: nil, skip_leading: nil, schema: nil, autodetect: nil, null_marker: nil + ensure_service! - yield (schema ||= Schema.from_gapi) if block_given? + updater = load_job_updater table_id, + format: format, create: create, + write: write, + projection_fields: projection_fields, + jagged_rows: jagged_rows, + quoted_newlines: quoted_newlines, + encoding: encoding, + delimiter: delimiter, + ignore_unknown: ignore_unknown, + max_bad_records: max_bad_records, + quote: quote, skip_leading: skip_leading, + schema: schema, + autodetect: autodetect, + null_marker: null_marker - options = { format: format, create: create, write: write, - projection_fields: projection_fields, - jagged_rows: jagged_rows, - quoted_newlines: quoted_newlines, encoding: encoding, - delimiter: delimiter, ignore_unknown: ignore_unknown, - max_bad_records: max_bad_records, quote: quote, - skip_leading: skip_leading, schema: schema, - autodetect: autodetect, null_marker: null_marker } - job = load_job table_id, file, options + yield updater if block_given? + job = load_local_or_uri files, updater job.wait_until_done! - - if job.failed? - begin - # raise to activate ruby exception cause handling - raise job.gapi_error - rescue StandardError => e - # wrap Google::Apis::Error with Google::Cloud::Error - raise Google::Cloud::Error.from_error(e) - end - end - + ensure_job_succeeded! job true end ## # Reloads the dataset with current data from the BigQuery service. @@ -1944,32 +2017,160 @@ # only partially loaded by a request to the API list method. def ensure_full_data! reload! if resource_partial? end - def load_storage table_id, url, options = {} + def ensure_job_succeeded! job + return unless job.failed? + begin + # raise to activate ruby exception cause handling + raise job.gapi_error + rescue StandardError => e + # wrap Google::Apis::Error with Google::Cloud::Error + raise Google::Cloud::Error.from_error(e) + end + end + + def load_job_gapi table_id, dryrun, job_id: nil, prefix: nil + job_ref = service.job_ref_from job_id, prefix + Google::Apis::BigqueryV2::Job.new( + job_reference: job_ref, + configuration: Google::Apis::BigqueryV2::JobConfiguration.new( + load: Google::Apis::BigqueryV2::JobConfigurationLoad.new( + destination_table: Google::Apis::BigqueryV2::TableReference.new( + project_id: @service.project, + dataset_id: dataset_id, + table_id: table_id + ) + ), + dry_run: dryrun + ) + ) + end + + def load_job_csv_options! job, jagged_rows: nil, + quoted_newlines: nil, + delimiter: nil, + quote: nil, skip_leading: nil, + null_marker: nil + job.jagged_rows = jagged_rows unless jagged_rows.nil? + job.quoted_newlines = quoted_newlines unless quoted_newlines.nil? + job.delimiter = delimiter unless delimiter.nil? + job.null_marker = null_marker unless null_marker.nil? + job.quote = quote unless quote.nil? + job.skip_leading = skip_leading unless skip_leading.nil? + end + + def load_job_file_options! job, format: nil, + projection_fields: nil, + jagged_rows: nil, quoted_newlines: nil, + encoding: nil, delimiter: nil, + ignore_unknown: nil, max_bad_records: nil, + quote: nil, skip_leading: nil, + null_marker: nil + job.format = format unless format.nil? + unless projection_fields.nil? + job.projection_fields = projection_fields + end + job.encoding = encoding unless encoding.nil? + job.ignore_unknown = ignore_unknown unless ignore_unknown.nil? + job.max_bad_records = max_bad_records unless max_bad_records.nil? + load_job_csv_options! job, jagged_rows: jagged_rows, + quoted_newlines: quoted_newlines, + delimiter: delimiter, + quote: quote, + skip_leading: skip_leading, + null_marker: null_marker + end + + def load_job_updater table_id, format: nil, create: nil, + write: nil, projection_fields: nil, + jagged_rows: nil, quoted_newlines: nil, + encoding: nil, delimiter: nil, + ignore_unknown: nil, max_bad_records: nil, + quote: nil, skip_leading: nil, dryrun: nil, + schema: nil, job_id: nil, prefix: nil, labels: nil, + autodetect: nil, null_marker: nil + new_job = load_job_gapi table_id, dryrun, job_id: job_id, + prefix: prefix + LoadJob::Updater.new(new_job).tap do |job| + job.location = location if location # may be dataset reference + job.create = create unless create.nil? + job.write = write unless write.nil? + job.schema = schema unless schema.nil? + job.autodetect = autodetect unless autodetect.nil? + job.labels = labels unless labels.nil? + load_job_file_options! job, format: format, + projection_fields: projection_fields, + jagged_rows: jagged_rows, + quoted_newlines: quoted_newlines, + encoding: encoding, + delimiter: delimiter, + ignore_unknown: ignore_unknown, + max_bad_records: max_bad_records, + quote: quote, + skip_leading: skip_leading, + null_marker: null_marker + end + end + + def load_storage urls, job_gapi # Convert to storage URL - url = url.to_gs_url if url.respond_to? :to_gs_url - url = url.to_s if url.is_a? URI + urls = [urls].flatten.map do |url| + if url.respond_to? :to_gs_url + url.to_gs_url + elsif url.is_a? URI + url.to_s + else + url + end + end - gapi = service.load_table_gs_url dataset_id, table_id, url, options + unless urls.nil? + job_gapi.configuration.load.update! source_uris: urls + if job_gapi.configuration.load.source_format.nil? + source_format = Convert.derive_source_format_from_list urls + unless source_format.nil? + job_gapi.configuration.load.source_format = source_format + end + end + end + + gapi = service.load_table_gs_url job_gapi Job.from_gapi gapi, service end - def load_local table_id, file, options = {} - # Convert to storage URL - file = file.to_gs_url if file.respond_to? :to_gs_url + def load_local file, job_gapi + path = Pathname(file).to_path + if job_gapi.configuration.load.source_format.nil? + source_format = Convert.derive_source_format path + unless source_format.nil? + job_gapi.configuration.load.source_format = source_format + end + end - gapi = service.load_table_file dataset_id, table_id, file, options + gapi = service.load_table_file file, job_gapi Job.from_gapi gapi, service end - def storage_url? file - file.respond_to?(:to_gs_url) || - (file.respond_to?(:to_str) && - file.to_str.downcase.start_with?("gs://")) || - (file.is_a?(URI) && - file.to_s.downcase.start_with?("gs://")) + def load_local_or_uri file, updater + job_gapi = updater.to_gapi + job = if local_file? file + load_local file, job_gapi + else + load_storage file, job_gapi + end + job + end + + def storage_url? files + [files].flatten.all? do |file| + file.respond_to?(:to_gs_url) || + (file.respond_to?(:to_str) && + file.to_str.downcase.start_with?("gs://")) || + (file.is_a?(URI) && + file.to_s.downcase.start_with?("gs://")) + end end def local_file? file ::File.file? file rescue StandardError