lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.0 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.1

- old
+ new

@@ -22,11 +22,12 @@ "per_size" => config.param("per_size", :integer, default: 1000), "limit_size" => config.param("limit_size", :integer, default: nil), "fields" => config.param("fields", :array, default: nil), "queries" => config.param("queries", :array), "sort" => config.param("sort", :hash, default: nil), - "add_query_to_record" => config.param("add_query_to_record", :bool, default: false) + "add_query_to_record" => config.param("add_query_to_record", :bool, default: false), + "scroll" => config.param("scroll", :string, default: '1m') } # TODO: want max_threads define_num_threads = config.param("num_threads", :integer, default: 1) task['slice_queries'] = InputThread.get_slice_from_num_threads(task['queries'], define_num_threads) @@ -57,10 +58,11 @@ @per_size = task['per_size'] @limit_size = task['limit_size'] @fields = task['fields'] @sort = task['sort'] @add_query_to_record = task['add_query_to_record'] + @scroll = task['scroll'] end def run search(@index_type, @per_size, @routing, @fields, @sort) page_builder.finish @@ -85,11 +87,11 @@ get_sources(r, fields).each do |result| result_proc(result, query) return if @limit_size == (i += 1) end - while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: '1m') and (not r['hits']['hits'].empty?) do + while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) and (not r['hits']['hits'].empty?) do get_sources(r, fields).each do |result| result_proc(result, query) return if @limit_size == (i += 1) end end @@ -112,10 +114,10 @@ end body[:sort] = sorts else body[:sort] = ["_doc"] end - search_option = { index: @index_name, type: type, scroll: '1m', body: body, size: size } + search_option = { index: @index_name, type: type, scroll: @scroll, body: body, size: size } search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',') search_option end def get_sources(results, fields)