lib/ej/core.rb in ej-0.1.4 vs lib/ej/core.rb in ej-0.1.5

- old
+ new

@@ -1,7 +1,5 @@ -#!/usr/bin/env ruby -# coding: utf-8 require 'yajl' require 'elasticsearch' require 'hashie' require 'parallel' require 'logger' @@ -10,20 +8,17 @@ disable_warnings if respond_to?(:disable_warnings) end module Ej class Core - DEFAULT_PER = 1000 - def initialize(host, index, debug) - @logger = Logger.new($stderr) - @logger.level = debug ? Logger::DEBUG : Logger::INFO - - @index = index - @client = Elasticsearch::Client.new hosts: host, logger: @logger, index: @index + def initialize(values) + @logger = values.logger + @index = values.index + @client = values.client end - def search(type, query, size, from, source_only, routing = nil, fields = nil, sort = nil) + def search(type, query, size, from, meta, routing = nil, fields = nil, sort = nil) body = { from: from } body[:size] = size unless size.nil? if sort sorts = [] sort.each do |k, v| @@ -34,55 +29,19 @@ body[:query] = { query_string: { query: query } } unless query.nil? search_option = { index: @index, type: type, body: body } search_option[:routing] = routing unless routing.nil? search_option[:_source] = fields.nil? ? nil : fields.join(',') results = HashWrapper.new(@client.search(search_option)) - source_only ? get_sources(results) : results + meta ? results : Util.get_sources(results) end def distinct(term, type, query) body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}} body[:query] = { query_string: { query: query } } unless query.nil? @client.search index: @index, type: type, body: body end - def copy(source, dest, query, per_size, proc_num, define_from = 0) - per = per_size || DEFAULT_PER - logger = Logger.new($stdout) - source_client = Elasticsearch::Client.new hosts: source, index: @index - dest_client = Elasticsearch::Client.new hosts: dest - calculate_body = { size: 0 } - calculate_body[:query] = { query_string: { query: query } } unless query.nil? - calculate_data = HashWrapper.new(source_client.search index: @index, body: calculate_body) - total = calculate_data.hits.total - payloads = ((total/per) + 1).times.to_a - Parallel.map(payloads, in_processes: proc_num) do |num| - from = num * per - if from < define_from - logger.info("skip index (#{num} #{from}-#{from + per})/#{total}") - next - end - body = { size: per, from: from } - body[:query] = { query_string: { query: query } } unless query.nil? - data = HashWrapper.new(source_client.search index: @index, body: body) - docs = data.hits.hits - bulk_message = [] - docs.each do |doc| - source = doc.delete('_source') - doc.delete('_score') - ['_id', '_type', '_index'].each do |meta_field| - source.delete(meta_field) - end - bulk_message << { index: doc.to_h } - bulk_message << source - end - send_with_retry(dest_client, bulk_message) - - logger.info("copy complete (#{num} #{from}-#{from + docs.size})/#{total}") - end - end - def dump(query, per_size) per = per_size || DEFAULT_PER num = 0 while true bulk_message = [] @@ -168,153 +127,90 @@ } } @client.search index: @index, body: body, size: 0 end - def aliases - @client.indices.get_aliases - end - - def health - @client.cluster.health - end - - def state - @client.cluster.state - end - - def indices - @client.cat.indices format: 'json' - end - - def stats - @client.indices.stats index: @index - end - - def put_mapping(index, type, body) - @client.indices.create index: index unless @client.indices.exists index: index - @client.indices.put_mapping index: index, type: type, body: body - end - - def mapping - data = @client.indices.get_mapping index: @index - @index == '_all' ? data : data[@index]['mappings'] - end - - def put_template(name, hash) - @client.indices.put_template name: name, body: hash - end - - def create_aliases(als, indices) - actions = [] - indices.each do |index| - actions << { add: { index: index, alias: als } } - end - @client.indices.update_aliases body: { - actions: actions - } - end - - def recovery - @client.indices.recovery index: @index - end - - def delete(index, type, query) - if query.nil? - if type.nil? - @client.indices.delete index: index + def bulk(timestamp_key, type, add_timestamp, id_keys, index, data) + template = id_keys.map { |key| '%s' }.join('_') unless id_keys.nil? + bulk_message = [] + data.each do |record| + if timestamp_key.nil? + timestamp = Time.now.to_datetime.to_s else - @client.delete_by_query index: index, type: type, q: '*' + timestamp = record[timestamp_key].to_time.to_datetime.to_s end - else - @client.delete_by_query index: index, type: type, q: query + record.merge!('@timestamp' => timestamp) if add_timestamp + meta = { index: { _index: index, _type: type } } + meta[:index][:_id] = Util.generate_id(template, record, id_keys) unless id_keys.nil? + bulk_message << meta + bulk_message << record end + connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? } end - def template - @client.indices.get_template - end + def copy(source, dest, query, per_size, scroll) + source_client = Elasticsearch::Client.new hosts: source + dest_client = Elasticsearch::Client.new hosts: dest - def delete_template(name) - @client.indices.delete_template name: name - end + scroll_option = get_scroll_option(@index, query, per_size, scroll) + r = connect_with_retry { source_client.search(scroll_option) } + total = r['hits']['total'] + i = 0 + i += bulk_results(r, dest_client, i, total) - def settings - @client.indices.get_settings + while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and + (not r['hits']['hits'].empty?) do + i += bulk_results(r, dest_client, i, total) + end end - def warmer - @client.indices.get_warmer index: @index - end + private - def refresh - @client.indices.refresh index: @index + def bulk_results(results, dest_client, before_size, total) + bulk_message = convert_results(results) + connect_with_retry do + dest_client.bulk body: bulk_message unless bulk_message.empty? + to_size = before_size + (bulk_message.size/2) + @logger.info "copy complete (#{before_size}-#{to_size})/#{total}" + end + return (bulk_message.size/2) end - def nodes_info - @client.nodes.info + def get_scroll_option(index, query, size, scroll) + body = {} + body[:query] = { query_string: { query: query } } unless query.nil? + search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) } + search_option end - def nodes_stats - @client.nodes.stats - end - - def bulk(timestamp_key, type, add_timestamp, id_keys, index) - data = parse_json(STDIN.read) - template = id_keys.map { |key| '%s' }.join('_') unless id_keys.nil? + def convert_results(search_results) + data = HashWrapper.new(search_results) + docs = data.hits.hits bulk_message = [] - data.each do |record| - if timestamp_key.nil? - timestamp = Time.now.to_datetime.to_s - else - timestamp = record[timestamp_key].to_time.to_datetime.to_s + docs.each do |doc| + source = doc.delete('_source') + doc.delete('_score') + ['_id', '_type', '_index'].each do |meta_field| + source.delete(meta_field) end - record.merge!('@timestamp' => timestamp) if add_timestamp - meta = { index: { _index: index, _type: type } } - meta[:index][:_id] = generate_id(template, record, id_keys) unless id_keys.nil? - bulk_message << meta - bulk_message << record + bulk_message << { index: doc.to_h } + bulk_message << source end - bulk_message.each_slice(10000).each do |block| - send_with_retry(@client, block) - end + bulk_message end - private - - def send_with_retry(client, bulk_message, retry_on_failure = 5) + def connect_with_retry(retry_on_failure = 5) retries = 0 begin - client.bulk body: bulk_message unless bulk_message.empty? + yield if block_given? rescue => e if retries < retry_on_failure retries += 1 - @logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" + @logger.warn "Could not connect to Elasticsearch, resetting connection and trying again. #{e.message}" sleep 2**retries retry end - raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" + raise "Could not connect to Elasticsearch after #{retries} retries. #{e.message}" end end - - def parse_json(buffer) - begin - data = Yajl::Parser.parse(buffer) - rescue => e - data = [] - buffer.split("\n").each do |line| - data << Yajl::Parser.parse(line) - end - end - data.class == Array ? data : [data] - end - - def generate_id(template, record, id_keys) - template % id_keys.map { |key| record[key] } - end - - def get_sources(results) - results.hits.hits.map { |result| result._source } - end - end end