lib/ej/core.rb in ej-0.1.1 vs lib/ej/core.rb in ej-0.1.2

- old
+ new

@@ -40,23 +40,26 @@ body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}} body[:query] = { query_string: { query: query } } unless query.nil? @client.search index: @index, type: type, body: body end - def copy(source, dest, query, per_size, proc_num) + def copy(source, dest, query, per_size, proc_num, define_from = 0) per = per_size || DEFAULT_PER - num = 0 logger = Logger.new($stdout) source_client = Elasticsearch::Client.new hosts: source, index: @index dest_client = Elasticsearch::Client.new hosts: dest calculate_body = { size: 0 } calculate_body[:query] = { query_string: { query: query } } unless query.nil? calculate_data = Hashie::Mash.new(source_client.search index: @index, body: calculate_body) total = calculate_data.hits.total payloads = ((total/per) + 1).times.to_a Parallel.map(payloads, in_processes: proc_num) do |num| from = num * per + if from < define_from + logger.info("skip index (#{num} #{from}-#{from + per})/#{total}") + next + end body = { size: per, from: from } body[:query] = { query_string: { query: query } } unless query.nil? data = Hashie::Mash.new(source_client.search index: @index, body: body) docs = data.hits.hits bulk_message = [] @@ -69,10 +72,10 @@ bulk_message << { index: doc.to_h } bulk_message << source end send_with_retry(dest_client, bulk_message) - logger.info("copy complete (#{from}-#{from + docs.size})/#{total}") + logger.info("copy complete (#{num} #{from}-#{from + docs.size})/#{total}") end end def dump(query, per_size) per = per_size || DEFAULT_PER