lib/ej/core.rb in ej-0.1.10 vs lib/ej/core.rb in ej-0.1.11
- old
+ new
@@ -145,30 +145,30 @@
bulk_message << record
end
connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? }
end
- def copy(source, dest, query, per_size, scroll)
+ def copy(source, dest, query, per_size, scroll, dest_index)
source_client = Elasticsearch::Client.new hosts: source
dest_client = Elasticsearch::Client.new hosts: dest
scroll_option = get_scroll_option(@index, query, per_size, scroll)
r = connect_with_retry { source_client.search(scroll_option) }
total = r['hits']['total']
i = 0
- i += bulk_results(r, dest_client, i, total)
+ i += bulk_results(r, dest_client, i, total, dest_index)
while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and
(not r['hits']['hits'].empty?) do
- i += bulk_results(r, dest_client, i, total)
+ i += bulk_results(r, dest_client, i, total, dest_index)
end
end
private
- def bulk_results(results, dest_client, before_size, total)
- bulk_message = convert_results(results)
+ def bulk_results(results, dest_client, before_size, total, dest_index)
+ bulk_message = convert_results(results, dest_index)
connect_with_retry do
dest_client.bulk body: bulk_message unless bulk_message.empty?
to_size = before_size + (bulk_message.size/2)
@logger.info "copy complete (#{before_size}-#{to_size})/#{total}"
end
@@ -180,19 +180,20 @@
body[:query] = { query_string: { query: query } } unless query.nil?
search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) }
search_option
end
- def convert_results(search_results)
+ def convert_results(search_results, dest_index)
data = HashWrapper.new(search_results)
docs = data.hits.hits
bulk_message = []
docs.each do |doc|
source = doc.delete('_source')
doc.delete('_score')
['_id', '_type', '_index'].each do |meta_field|
source.delete(meta_field)
end
+ doc._index = dest_index if dest_index
bulk_message << { index: doc.to_h }
bulk_message << source
end
bulk_message
end