lib/fluent/plugin/out_bigquery_base.rb in fluent-plugin-bigquery-2.2.0 vs lib/fluent/plugin/out_bigquery_base.rb in fluent-plugin-bigquery-2.3.0

- old
+ new

@@ -109,13 +109,10 @@ @table_schema = Fluent::BigQuery::RecordSchema.new('record') if @schema @table_schema.load_schema(@schema) end - if @schema_path - @table_schema.load_schema(MultiJson.load(File.read(@schema_path))) - end formatter_config = conf.elements("format")[0] @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config) end @@ -124,10 +121,11 @@ @tables_queue = @tablelist.shuffle @tables_mutex = Mutex.new @fetched_schemas = {} @last_fetch_schema_time = Hash.new(0) + @read_schemas = {} end def multi_workers_ready? true end @@ -146,10 +144,11 @@ prevent_duplicate_load: @prevent_duplicate_load, auto_create_table: @auto_create_table, time_partitioning_type: @time_partitioning_type, time_partitioning_field: @time_partitioning_field, time_partitioning_expiration: @time_partitioning_expiration, + require_partition_filter: @require_partition_filter, clustering_fields: @clustering_fields, timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec, }) end @@ -159,10 +158,12 @@ meta = metadata(tag, time, record) schema = if @fetch_schema fetch_schema(meta) + elsif @schema_path + read_schema(meta) else @table_schema end begin @@ -207,12 +208,29 @@ def fetch_schema_target_table(metadata) extract_placeholders(@fetch_schema_table || @tablelist[0], metadata) end + def read_schema(metadata) + schema_path = read_schema_target_path(metadata) + + unless @read_schemas[schema_path] + table_schema = Fluent::BigQuery::RecordSchema.new("record") + table_schema.load_schema(MultiJson.load(File.read(schema_path))) + @read_schemas[schema_path] = table_schema + end + @read_schemas[schema_path] + end + + def read_schema_target_path(metadata) + extract_placeholders(@schema_path, metadata) + end + def get_schema(project, dataset, metadata) if @fetch_schema @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table(metadata)}"] || fetch_schema(metadata) + elsif @schema_path + @read_schemas[read_schema_target_path(metadata)] || read_schema(metadata) else @table_schema end end end