lib/ej/core.rb in ej-0.1.11 vs lib/ej/core.rb in ej-0.1.12

- old
+ new

@@ -1,6 +1,6 @@ -require 'yajl' +require 'json' require 'elasticsearch' require 'hashie' require 'parallel' require 'logger' @@ -52,12 +52,12 @@ docs = data.hits.hits break if docs.empty? docs.each do |doc| source = doc.delete('_source') doc.delete('_score') - bulk_message << Yajl::Encoder.encode({ 'index' => doc.to_h }) - bulk_message << Yajl::Encoder.encode(source) + bulk_message << JSON.dump({ 'index' => doc.to_h }) + bulk_message << JSON.dump(source) end num += 1 puts bulk_message.join("\n") end end @@ -145,41 +145,45 @@ bulk_message << record end connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? } end - def copy(source, dest, query, per_size, scroll, dest_index) + def copy(source, dest, query, per_size, scroll, dest_index, slice_max) source_client = Elasticsearch::Client.new hosts: source dest_client = Elasticsearch::Client.new hosts: dest - scroll_option = get_scroll_option(@index, query, per_size, scroll) - r = connect_with_retry { source_client.search(scroll_option) } - total = r['hits']['total'] - i = 0 - i += bulk_results(r, dest_client, i, total, dest_index) + parallel_array = slice_max ? slice_max.times.to_a : [0] + Parallel.map(parallel_array, :in_processes=>parallel_array.size) do |slice_id| + scroll_option = get_scroll_option(@index, query, per_size, scroll, slice_id, slice_max) + r = connect_with_retry { source_client.search(scroll_option) } + total = r['hits']['total'] + i = 0 + i += bulk_results(r, dest_client, i, total, dest_index, slice_id) - while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and - (not r['hits']['hits'].empty?) do - i += bulk_results(r, dest_client, i, total, dest_index) + while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and + (not r['hits']['hits'].empty?) do + i += bulk_results(r, dest_client, i, total, dest_index, slice_id) + end end end private - def bulk_results(results, dest_client, before_size, total, dest_index) + def bulk_results(results, dest_client, before_size, total, dest_index, slice_id) bulk_message = convert_results(results, dest_index) connect_with_retry do dest_client.bulk body: bulk_message unless bulk_message.empty? to_size = before_size + (bulk_message.size/2) - @logger.info "copy complete (#{before_size}-#{to_size})/#{total}" + @logger.info "slice_id[#{slice_id}] copy complete (#{before_size}-#{to_size})/#{total}" end return (bulk_message.size/2) end - def get_scroll_option(index, query, size, scroll) + def get_scroll_option(index, query, size, scroll, slice_id, slice_max) body = {} body[:query] = { query_string: { query: query } } unless query.nil? + body[:slice] = { id: slice_id, max: slice_max } if slice_max search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) } search_option end def convert_results(search_results, dest_index) @@ -205,10 +209,10 @@ yield if block_given? rescue => e if retries < retry_on_failure retries += 1 @logger.warn "Could not connect to Elasticsearch, resetting connection and trying again. #{e.message}" - sleep 2**retries + sleep 10**retries retry end raise "Could not connect to Elasticsearch after #{retries} retries. #{e.message}" end end