lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.0 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.1
- old
+ new
@@ -22,11 +22,12 @@
"per_size" => config.param("per_size", :integer, default: 1000),
"limit_size" => config.param("limit_size", :integer, default: nil),
"fields" => config.param("fields", :array, default: nil),
"queries" => config.param("queries", :array),
"sort" => config.param("sort", :hash, default: nil),
- "add_query_to_record" => config.param("add_query_to_record", :bool, default: false)
+ "add_query_to_record" => config.param("add_query_to_record", :bool, default: false),
+ "scroll" => config.param("scroll", :string, default: '1m')
}
# TODO: want max_threads
define_num_threads = config.param("num_threads", :integer, default: 1)
task['slice_queries'] = InputThread.get_slice_from_num_threads(task['queries'], define_num_threads)
@@ -57,10 +58,11 @@
@per_size = task['per_size']
@limit_size = task['limit_size']
@fields = task['fields']
@sort = task['sort']
@add_query_to_record = task['add_query_to_record']
+ @scroll = task['scroll']
end
def run
search(@index_type, @per_size, @routing, @fields, @sort)
page_builder.finish
@@ -85,11 +87,11 @@
get_sources(r, fields).each do |result|
result_proc(result, query)
return if @limit_size == (i += 1)
end
- while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: '1m') and (not r['hits']['hits'].empty?) do
+ while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) and (not r['hits']['hits'].empty?) do
get_sources(r, fields).each do |result|
result_proc(result, query)
return if @limit_size == (i += 1)
end
end
@@ -112,10 +114,10 @@
end
body[:sort] = sorts
else
body[:sort] = ["_doc"]
end
- search_option = { index: @index_name, type: type, scroll: '1m', body: body, size: size }
+ search_option = { index: @index_name, type: type, scroll: @scroll, body: body, size: size }
search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',')
search_option
end
def get_sources(results, fields)