lib/fluent/plugin/out_bigquery_base.rb in fluent-plugin-bigquery-2.2.0 vs lib/fluent/plugin/out_bigquery_base.rb in fluent-plugin-bigquery-2.3.0
- old
+ new
@@ -109,13 +109,10 @@
@table_schema = Fluent::BigQuery::RecordSchema.new('record')
if @schema
@table_schema.load_schema(@schema)
end
- if @schema_path
- @table_schema.load_schema(MultiJson.load(File.read(@schema_path)))
- end
formatter_config = conf.elements("format")[0]
@formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config)
end
@@ -124,10 +121,11 @@
@tables_queue = @tablelist.shuffle
@tables_mutex = Mutex.new
@fetched_schemas = {}
@last_fetch_schema_time = Hash.new(0)
+ @read_schemas = {}
end
def multi_workers_ready?
true
end
@@ -146,10 +144,11 @@
prevent_duplicate_load: @prevent_duplicate_load,
auto_create_table: @auto_create_table,
time_partitioning_type: @time_partitioning_type,
time_partitioning_field: @time_partitioning_field,
time_partitioning_expiration: @time_partitioning_expiration,
+ require_partition_filter: @require_partition_filter,
clustering_fields: @clustering_fields,
timeout_sec: @request_timeout_sec,
open_timeout_sec: @request_open_timeout_sec,
})
end
@@ -159,10 +158,12 @@
meta = metadata(tag, time, record)
schema =
if @fetch_schema
fetch_schema(meta)
+ elsif @schema_path
+ read_schema(meta)
else
@table_schema
end
begin
@@ -207,12 +208,29 @@
def fetch_schema_target_table(metadata)
extract_placeholders(@fetch_schema_table || @tablelist[0], metadata)
end
+ def read_schema(metadata)
+ schema_path = read_schema_target_path(metadata)
+
+ unless @read_schemas[schema_path]
+ table_schema = Fluent::BigQuery::RecordSchema.new("record")
+ table_schema.load_schema(MultiJson.load(File.read(schema_path)))
+ @read_schemas[schema_path] = table_schema
+ end
+ @read_schemas[schema_path]
+ end
+
+ def read_schema_target_path(metadata)
+ extract_placeholders(@schema_path, metadata)
+ end
+
def get_schema(project, dataset, metadata)
if @fetch_schema
@fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table(metadata)}"] || fetch_schema(metadata)
+ elsif @schema_path
+ @read_schemas[read_schema_target_path(metadata)] || read_schema(metadata)
else
@table_schema
end
end
end