lib/ej/core.rb in ej-0.1.0 vs lib/ej/core.rb in ej-0.1.1
- old
+ new
@@ -3,16 +3,19 @@
require 'yaml'
require 'yajl'
require 'elasticsearch'
require 'hashie'
require 'pp'
+require 'parallel'
module Ej
class Core
DEFAULT_PER = 1000
def initialize(host, index, debug)
- @logger = debug ? Logger.new($stderr) : nil
+ @logger = Logger.new($stderr)
+ @logger.level = debug ? Logger::DEBUG : Logger::INFO
+
@index = index
@client = Elasticsearch::Client.new hosts: host, logger: @logger, index: @index
end
def search(type, query, size, from, source_only, routing = nil, fields = nil, sort = nil)
@@ -37,37 +40,40 @@
body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}}
body[:query] = { query_string: { query: query } } unless query.nil?
@client.search index: @index, type: type, body: body
end
- def copy(source, dest, query, per_size)
+ def copy(source, dest, query, per_size, proc_num)
per = per_size || DEFAULT_PER
num = 0
logger = Logger.new($stdout)
source_client = Elasticsearch::Client.new hosts: source, index: @index
dest_client = Elasticsearch::Client.new hosts: dest
- while true
+ calculate_body = { size: 0 }
+ calculate_body[:query] = { query_string: { query: query } } unless query.nil?
+ calculate_data = Hashie::Mash.new(source_client.search index: @index, body: calculate_body)
+ total = calculate_data.hits.total
+ payloads = ((total/per) + 1).times.to_a
+ Parallel.map(payloads, in_processes: proc_num) do |num|
from = num * per
body = { size: per, from: from }
body[:query] = { query_string: { query: query } } unless query.nil?
data = Hashie::Mash.new(source_client.search index: @index, body: body)
docs = data.hits.hits
- total = data.hits.total
- break if docs.empty?
bulk_message = []
docs.each do |doc|
source = doc.delete('_source')
doc.delete('_score')
['_id', '_type', '_index'].each do |meta_field|
source.delete(meta_field)
end
bulk_message << { index: doc.to_h }
bulk_message << source
end
- dest_client.bulk body: bulk_message unless bulk_message.empty?
- logger.info("copy complete #{from + docs.size}/#{total}")
- num += 1
+ send_with_retry(dest_client, bulk_message)
+
+ logger.info("copy complete (#{from}-#{from + docs.size})/#{total}")
end
end
def dump(query, per_size)
per = per_size || DEFAULT_PER
@@ -241,14 +247,29 @@
meta[:index][:_id] = generate_id(template, record, id_keys) unless id_keys.nil?
bulk_message << meta
bulk_message << record
end
bulk_message.each_slice(10000).each do |block|
- @client.bulk body: block
+ send_with_retry(@client, block)
end
end
private
+
+ def send_with_retry(client, bulk_message, retry_on_failure = 5)
+ retries = 0
+ begin
+ client.bulk body: bulk_message unless bulk_message.empty?
+ rescue => e
+ if retries < retry_on_failure
+ retries += 1
+ @logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
+ sleep 2**retries
+ retry
+ end
+ raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
+ end
+ end
def parse_json(buffer)
begin
data = Yajl::Parser.parse(buffer)
rescue => e