lib/ej/core.rb in ej-0.1.1 vs lib/ej/core.rb in ej-0.1.2
- old
+ new
@@ -40,23 +40,26 @@
body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}}
body[:query] = { query_string: { query: query } } unless query.nil?
@client.search index: @index, type: type, body: body
end
- def copy(source, dest, query, per_size, proc_num)
+ def copy(source, dest, query, per_size, proc_num, define_from = 0)
per = per_size || DEFAULT_PER
- num = 0
logger = Logger.new($stdout)
source_client = Elasticsearch::Client.new hosts: source, index: @index
dest_client = Elasticsearch::Client.new hosts: dest
calculate_body = { size: 0 }
calculate_body[:query] = { query_string: { query: query } } unless query.nil?
calculate_data = Hashie::Mash.new(source_client.search index: @index, body: calculate_body)
total = calculate_data.hits.total
payloads = ((total/per) + 1).times.to_a
Parallel.map(payloads, in_processes: proc_num) do |num|
from = num * per
+ if from < define_from
+ logger.info("skip index (#{num} #{from}-#{from + per})/#{total}")
+ next
+ end
body = { size: per, from: from }
body[:query] = { query_string: { query: query } } unless query.nil?
data = Hashie::Mash.new(source_client.search index: @index, body: body)
docs = data.hits.hits
bulk_message = []
@@ -69,10 +72,10 @@
bulk_message << { index: doc.to_h }
bulk_message << source
end
send_with_retry(dest_client, bulk_message)
- logger.info("copy complete (#{from}-#{from + docs.size})/#{total}")
+ logger.info("copy complete (#{num} #{from}-#{from + docs.size})/#{total}")
end
end
def dump(query, per_size)
per = per_size || DEFAULT_PER