lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.4.0 vs lib/embulk/output/bigquery/bigquery_client.rb in embulk-output-bigquery-0.4.1

- old
+ new

@@ -192,10 +192,14 @@ allow_quoted_newlines: @task['allow_quoted_newlines'], } } } + if @task['schema_update_options'] + body[:configuration][:load][:schema_update_options] = @task['schema_update_options'] + end + opts = { upload_source: path, content_type: "application/octet-stream", # options: { # retries: @task['retries'], @@ -252,10 +256,14 @@ }, } } } + if @task['schema_update_options'] + body[:configuration][:load][:schema_update_options] = @task['schema_update_options'] + end + opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } response = with_network_retry { client.insert_job(@project, body, opts) } wait_load('Copy', response) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e @@ -369,28 +377,36 @@ } raise Error, "failed to get dataset #{@project}:#{dataset}, response:#{response}" end end - def create_table(table, dataset: nil, options: {}) + def create_table(table, dataset: nil, options: nil) begin - table = Helper.chomp_partition_decorator(table) dataset ||= @dataset + options ||= {} + options['time_partitioning'] ||= @task['time_partitioning'] + if Helper.has_partition_decorator?(table) + options['time_partitioning'] ||= {'type' => 'DAY'} + table = Helper.chomp_partition_decorator(table) + end + Embulk.logger.info { "embulk-output-bigquery: Create table... #{@project}:#{dataset}.#{table}" } body = { table_reference: { table_id: table, }, schema: { fields: fields, } } + if options['time_partitioning'] body[:time_partitioning] = { type: options['time_partitioning']['type'], expiration_ms: options['time_partitioning']['expiration_ms'], } end + opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{body}, #{opts})" } with_network_retry { client.insert_table(@project, dataset, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message