lib/ej/core.rb in ej-0.1.10 vs lib/ej/core.rb in ej-0.1.11

- old
+ new

@@ -145,30 +145,30 @@ bulk_message << record end connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? } end - def copy(source, dest, query, per_size, scroll) + def copy(source, dest, query, per_size, scroll, dest_index) source_client = Elasticsearch::Client.new hosts: source dest_client = Elasticsearch::Client.new hosts: dest scroll_option = get_scroll_option(@index, query, per_size, scroll) r = connect_with_retry { source_client.search(scroll_option) } total = r['hits']['total'] i = 0 - i += bulk_results(r, dest_client, i, total) + i += bulk_results(r, dest_client, i, total, dest_index) while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and (not r['hits']['hits'].empty?) do - i += bulk_results(r, dest_client, i, total) + i += bulk_results(r, dest_client, i, total, dest_index) end end private - def bulk_results(results, dest_client, before_size, total) - bulk_message = convert_results(results) + def bulk_results(results, dest_client, before_size, total, dest_index) + bulk_message = convert_results(results, dest_index) connect_with_retry do dest_client.bulk body: bulk_message unless bulk_message.empty? to_size = before_size + (bulk_message.size/2) @logger.info "copy complete (#{before_size}-#{to_size})/#{total}" end @@ -180,19 +180,20 @@ body[:query] = { query_string: { query: query } } unless query.nil? search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) } search_option end - def convert_results(search_results) + def convert_results(search_results, dest_index) data = HashWrapper.new(search_results) docs = data.hits.hits bulk_message = [] docs.each do |doc| source = doc.delete('_source') doc.delete('_score') ['_id', '_type', '_index'].each do |meta_field| source.delete(meta_field) end + doc._index = dest_index if dest_index bulk_message << { index: doc.to_h } bulk_message << source end bulk_message end