lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.0 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.1
- old
+ new
@@ -18,20 +18,24 @@
"retry_on_failure" => config.param("retry_on_failure", :integer, default: 5),
"per_size" => config.param("per_size", :integer, default: 1000),
"limit_size" => config.param("limit_size", :integer, default: nil),
"fields" => config.param("fields", :array, default: nil),
"queries" => config.param("queries", :array),
- "sort" => config.param("sort", :hash, default: nil)
+ "sort" => config.param("sort", :hash, default: nil),
+ "add_query_to_record" => config.param("add_query_to_record", :bool, default: false)
}
# TODO: want max_threads
define_num_threads = config.param("num_threads", :integer, default: 1)
task['slice_queries'] = get_slice_from_num_threads(task['queries'], define_num_threads)
columns = []
task['fields'].each_with_index{ |field, i|
columns << Column.new(i, field['name'], field['type'].to_sym)
}
+ if task['add_query_to_record']
+ columns << Column.new(task['fields'].size, "query", :string)
+ end
resume(task, columns, task['slice_queries'].size, &control)
end
def self.get_slice_from_num_threads(array, define_num_threads)
@@ -79,10 +83,11 @@
@index_type = task['index_type']
@per_size = task['per_size']
@limit_size = task['limit_size']
@fields = task['fields']
@sort = task['sort']
+ @add_query_to_record = task['add_query_to_record']
end
def run
@queries.each do |query|
query_count = 0
@@ -94,9 +99,12 @@
size = get_size(next_results_size, now_results_size ,total_count)
break if size == 0
results = get_sources(search(@index_type, query, size, now_results_size, @routing, @fields, @sort), @fields)
results.each do |record|
+ if @add_query_to_record
+ record << query
+ end
page_builder.add(record)
end
break if last_query?(next_results_size ,total_count)
query_count += 1
end