lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.1.0 vs lib/fluent/plugin/bigquery/writer.rb in fluent-plugin-bigquery-2.2.0

- old
+ new

@@ -33,10 +33,11 @@ fields: record_schema.to_a, } } definition.merge!(time_partitioning: time_partitioning) if time_partitioning + definition.merge!(clustering: clustering) if clustering client.insert_table(project, dataset, definition, {}) log.debug "create table", project_id: project, dataset: dataset, table: table_id rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e message = e.message if e.status_code == 409 && /Already Exists:/ =~ message @@ -147,10 +148,11 @@ if e.status_code == 404 && /Not Found: Table/i =~ e.message raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table] raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty? configuration[:configuration][:load].merge!(schema: {fields: fields.to_a}) configuration[:configuration][:load].merge!(time_partitioning: time_partitioning) if time_partitioning + configuration[:configuration][:load].merge!(clustering: clustering) if clustering end end res = client.insert_job( project, @@ -172,12 +174,13 @@ end def fetch_load_job(job_reference) project = job_reference.project_id job_id = job_reference.job_id + location = @options[:location] - res = client.get_job(project, job_id) + res = client.get_job(project, job_id, location: location) log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id) if res.status.state == "DONE" res end @@ -307,13 +310,24 @@ if @options[:time_partitioning_type] @time_partitioning = { type: @options[:time_partitioning_type].to_s.upcase, field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil, expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil, - require_partition_filter: @options[:time_partitioning_require_partition_filter], }.reject { |_, v| v.nil? } else @time_partitioning + end + end + + def clustering + return @clustering if instance_variable_defined?(:@clustering) + + if @options[:clustering_fields] + @clustering = { + fields: @options[:clustering_fields] + } + else + @clustering end end def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) try_count ||= 1