lib/ej/core.rb in ej-0.1.11 vs lib/ej/core.rb in ej-0.1.12
- old
+ new
@@ -1,6 +1,6 @@
-require 'yajl'
+require 'json'
require 'elasticsearch'
require 'hashie'
require 'parallel'
require 'logger'
@@ -52,12 +52,12 @@
docs = data.hits.hits
break if docs.empty?
docs.each do |doc|
source = doc.delete('_source')
doc.delete('_score')
- bulk_message << Yajl::Encoder.encode({ 'index' => doc.to_h })
- bulk_message << Yajl::Encoder.encode(source)
+ bulk_message << JSON.dump({ 'index' => doc.to_h })
+ bulk_message << JSON.dump(source)
end
num += 1
puts bulk_message.join("\n")
end
end
@@ -145,41 +145,45 @@
bulk_message << record
end
connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? }
end
- def copy(source, dest, query, per_size, scroll, dest_index)
+ def copy(source, dest, query, per_size, scroll, dest_index, slice_max)
source_client = Elasticsearch::Client.new hosts: source
dest_client = Elasticsearch::Client.new hosts: dest
- scroll_option = get_scroll_option(@index, query, per_size, scroll)
- r = connect_with_retry { source_client.search(scroll_option) }
- total = r['hits']['total']
- i = 0
- i += bulk_results(r, dest_client, i, total, dest_index)
+ parallel_array = slice_max ? slice_max.times.to_a : [0]
+ Parallel.map(parallel_array, :in_processes=>parallel_array.size) do |slice_id|
+ scroll_option = get_scroll_option(@index, query, per_size, scroll, slice_id, slice_max)
+ r = connect_with_retry { source_client.search(scroll_option) }
+ total = r['hits']['total']
+ i = 0
+ i += bulk_results(r, dest_client, i, total, dest_index, slice_id)
- while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and
- (not r['hits']['hits'].empty?) do
- i += bulk_results(r, dest_client, i, total, dest_index)
+ while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and
+ (not r['hits']['hits'].empty?) do
+ i += bulk_results(r, dest_client, i, total, dest_index, slice_id)
+ end
end
end
private
- def bulk_results(results, dest_client, before_size, total, dest_index)
+ def bulk_results(results, dest_client, before_size, total, dest_index, slice_id)
bulk_message = convert_results(results, dest_index)
connect_with_retry do
dest_client.bulk body: bulk_message unless bulk_message.empty?
to_size = before_size + (bulk_message.size/2)
- @logger.info "copy complete (#{before_size}-#{to_size})/#{total}"
+ @logger.info "slice_id[#{slice_id}] copy complete (#{before_size}-#{to_size})/#{total}"
end
return (bulk_message.size/2)
end
- def get_scroll_option(index, query, size, scroll)
+ def get_scroll_option(index, query, size, scroll, slice_id, slice_max)
body = {}
body[:query] = { query_string: { query: query } } unless query.nil?
+ body[:slice] = { id: slice_id, max: slice_max } if slice_max
search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) }
search_option
end
def convert_results(search_results, dest_index)
@@ -205,10 +209,10 @@
yield if block_given?
rescue => e
if retries < retry_on_failure
retries += 1
@logger.warn "Could not connect to Elasticsearch, resetting connection and trying again. #{e.message}"
- sleep 2**retries
+ sleep 10**retries
retry
end
raise "Could not connect to Elasticsearch after #{retries} retries. #{e.message}"
end
end