lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.0 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.1

- old
+ new

@@ -18,20 +18,24 @@ "retry_on_failure" => config.param("retry_on_failure", :integer, default: 5), "per_size" => config.param("per_size", :integer, default: 1000), "limit_size" => config.param("limit_size", :integer, default: nil), "fields" => config.param("fields", :array, default: nil), "queries" => config.param("queries", :array), - "sort" => config.param("sort", :hash, default: nil) + "sort" => config.param("sort", :hash, default: nil), + "add_query_to_record" => config.param("add_query_to_record", :bool, default: false) } # TODO: want max_threads define_num_threads = config.param("num_threads", :integer, default: 1) task['slice_queries'] = get_slice_from_num_threads(task['queries'], define_num_threads) columns = [] task['fields'].each_with_index{ |field, i| columns << Column.new(i, field['name'], field['type'].to_sym) } + if task['add_query_to_record'] + columns << Column.new(task['fields'].size, "query", :string) + end resume(task, columns, task['slice_queries'].size, &control) end def self.get_slice_from_num_threads(array, define_num_threads) @@ -79,10 +83,11 @@ @index_type = task['index_type'] @per_size = task['per_size'] @limit_size = task['limit_size'] @fields = task['fields'] @sort = task['sort'] + @add_query_to_record = task['add_query_to_record'] end def run @queries.each do |query| query_count = 0 @@ -94,9 +99,12 @@ size = get_size(next_results_size, now_results_size ,total_count) break if size == 0 results = get_sources(search(@index_type, query, size, now_results_size, @routing, @fields, @sort), @fields) results.each do |record| + if @add_query_to_record + record << query + end page_builder.add(record) end break if last_query?(next_results_size ,total_count) query_count += 1 end