lib/ej/core.rb in ej-0.1.0 vs lib/ej/core.rb in ej-0.1.1

- old
+ new

@@ -3,16 +3,19 @@ require 'yaml' require 'yajl' require 'elasticsearch' require 'hashie' require 'pp' +require 'parallel' module Ej class Core DEFAULT_PER = 1000 def initialize(host, index, debug) - @logger = debug ? Logger.new($stderr) : nil + @logger = Logger.new($stderr) + @logger.level = debug ? Logger::DEBUG : Logger::INFO + @index = index @client = Elasticsearch::Client.new hosts: host, logger: @logger, index: @index end def search(type, query, size, from, source_only, routing = nil, fields = nil, sort = nil) @@ -37,37 +40,40 @@ body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}} body[:query] = { query_string: { query: query } } unless query.nil? @client.search index: @index, type: type, body: body end - def copy(source, dest, query, per_size) + def copy(source, dest, query, per_size, proc_num) per = per_size || DEFAULT_PER num = 0 logger = Logger.new($stdout) source_client = Elasticsearch::Client.new hosts: source, index: @index dest_client = Elasticsearch::Client.new hosts: dest - while true + calculate_body = { size: 0 } + calculate_body[:query] = { query_string: { query: query } } unless query.nil? + calculate_data = Hashie::Mash.new(source_client.search index: @index, body: calculate_body) + total = calculate_data.hits.total + payloads = ((total/per) + 1).times.to_a + Parallel.map(payloads, in_processes: proc_num) do |num| from = num * per body = { size: per, from: from } body[:query] = { query_string: { query: query } } unless query.nil? data = Hashie::Mash.new(source_client.search index: @index, body: body) docs = data.hits.hits - total = data.hits.total - break if docs.empty? bulk_message = [] docs.each do |doc| source = doc.delete('_source') doc.delete('_score') ['_id', '_type', '_index'].each do |meta_field| source.delete(meta_field) end bulk_message << { index: doc.to_h } bulk_message << source end - dest_client.bulk body: bulk_message unless bulk_message.empty? - logger.info("copy complete #{from + docs.size}/#{total}") - num += 1 + send_with_retry(dest_client, bulk_message) + + logger.info("copy complete (#{from}-#{from + docs.size})/#{total}") end end def dump(query, per_size) per = per_size || DEFAULT_PER @@ -241,14 +247,29 @@ meta[:index][:_id] = generate_id(template, record, id_keys) unless id_keys.nil? bulk_message << meta bulk_message << record end bulk_message.each_slice(10000).each do |block| - @client.bulk body: block + send_with_retry(@client, block) end end private + + def send_with_retry(client, bulk_message, retry_on_failure = 5) + retries = 0 + begin + client.bulk body: bulk_message unless bulk_message.empty? + rescue => e + if retries < retry_on_failure + retries += 1 + @logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" + sleep 2**retries + retry + end + raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" + end + end def parse_json(buffer) begin data = Yajl::Parser.parse(buffer) rescue => e