lib/ej/core.rb in ej-0.1.4 vs lib/ej/core.rb in ej-0.1.5
- old
+ new
@@ -1,7 +1,5 @@
-#!/usr/bin/env ruby
-# coding: utf-8
require 'yajl'
require 'elasticsearch'
require 'hashie'
require 'parallel'
require 'logger'
@@ -10,20 +8,17 @@
disable_warnings if respond_to?(:disable_warnings)
end
module Ej
class Core
- DEFAULT_PER = 1000
- def initialize(host, index, debug)
- @logger = Logger.new($stderr)
- @logger.level = debug ? Logger::DEBUG : Logger::INFO
-
- @index = index
- @client = Elasticsearch::Client.new hosts: host, logger: @logger, index: @index
+ def initialize(values)
+ @logger = values.logger
+ @index = values.index
+ @client = values.client
end
- def search(type, query, size, from, source_only, routing = nil, fields = nil, sort = nil)
+ def search(type, query, size, from, meta, routing = nil, fields = nil, sort = nil)
body = { from: from }
body[:size] = size unless size.nil?
if sort
sorts = []
sort.each do |k, v|
@@ -34,55 +29,19 @@
body[:query] = { query_string: { query: query } } unless query.nil?
search_option = { index: @index, type: type, body: body }
search_option[:routing] = routing unless routing.nil?
search_option[:_source] = fields.nil? ? nil : fields.join(',')
results = HashWrapper.new(@client.search(search_option))
- source_only ? get_sources(results) : results
+ meta ? results : Util.get_sources(results)
end
def distinct(term, type, query)
body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}}
body[:query] = { query_string: { query: query } } unless query.nil?
@client.search index: @index, type: type, body: body
end
- def copy(source, dest, query, per_size, proc_num, define_from = 0)
- per = per_size || DEFAULT_PER
- logger = Logger.new($stdout)
- source_client = Elasticsearch::Client.new hosts: source, index: @index
- dest_client = Elasticsearch::Client.new hosts: dest
- calculate_body = { size: 0 }
- calculate_body[:query] = { query_string: { query: query } } unless query.nil?
- calculate_data = HashWrapper.new(source_client.search index: @index, body: calculate_body)
- total = calculate_data.hits.total
- payloads = ((total/per) + 1).times.to_a
- Parallel.map(payloads, in_processes: proc_num) do |num|
- from = num * per
- if from < define_from
- logger.info("skip index (#{num} #{from}-#{from + per})/#{total}")
- next
- end
- body = { size: per, from: from }
- body[:query] = { query_string: { query: query } } unless query.nil?
- data = HashWrapper.new(source_client.search index: @index, body: body)
- docs = data.hits.hits
- bulk_message = []
- docs.each do |doc|
- source = doc.delete('_source')
- doc.delete('_score')
- ['_id', '_type', '_index'].each do |meta_field|
- source.delete(meta_field)
- end
- bulk_message << { index: doc.to_h }
- bulk_message << source
- end
- send_with_retry(dest_client, bulk_message)
-
- logger.info("copy complete (#{num} #{from}-#{from + docs.size})/#{total}")
- end
- end
-
def dump(query, per_size)
per = per_size || DEFAULT_PER
num = 0
while true
bulk_message = []
@@ -168,153 +127,90 @@
}
}
@client.search index: @index, body: body, size: 0
end
- def aliases
- @client.indices.get_aliases
- end
-
- def health
- @client.cluster.health
- end
-
- def state
- @client.cluster.state
- end
-
- def indices
- @client.cat.indices format: 'json'
- end
-
- def stats
- @client.indices.stats index: @index
- end
-
- def put_mapping(index, type, body)
- @client.indices.create index: index unless @client.indices.exists index: index
- @client.indices.put_mapping index: index, type: type, body: body
- end
-
- def mapping
- data = @client.indices.get_mapping index: @index
- @index == '_all' ? data : data[@index]['mappings']
- end
-
- def put_template(name, hash)
- @client.indices.put_template name: name, body: hash
- end
-
- def create_aliases(als, indices)
- actions = []
- indices.each do |index|
- actions << { add: { index: index, alias: als } }
- end
- @client.indices.update_aliases body: {
- actions: actions
- }
- end
-
- def recovery
- @client.indices.recovery index: @index
- end
-
- def delete(index, type, query)
- if query.nil?
- if type.nil?
- @client.indices.delete index: index
+ def bulk(timestamp_key, type, add_timestamp, id_keys, index, data)
+ template = id_keys.map { |key| '%s' }.join('_') unless id_keys.nil?
+ bulk_message = []
+ data.each do |record|
+ if timestamp_key.nil?
+ timestamp = Time.now.to_datetime.to_s
else
- @client.delete_by_query index: index, type: type, q: '*'
+ timestamp = record[timestamp_key].to_time.to_datetime.to_s
end
- else
- @client.delete_by_query index: index, type: type, q: query
+ record.merge!('@timestamp' => timestamp) if add_timestamp
+ meta = { index: { _index: index, _type: type } }
+ meta[:index][:_id] = Util.generate_id(template, record, id_keys) unless id_keys.nil?
+ bulk_message << meta
+ bulk_message << record
end
+ connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? }
end
- def template
- @client.indices.get_template
- end
+ def copy(source, dest, query, per_size, scroll)
+ source_client = Elasticsearch::Client.new hosts: source
+ dest_client = Elasticsearch::Client.new hosts: dest
- def delete_template(name)
- @client.indices.delete_template name: name
- end
+ scroll_option = get_scroll_option(@index, query, per_size, scroll)
+ r = connect_with_retry { source_client.search(scroll_option) }
+ total = r['hits']['total']
+ i = 0
+ i += bulk_results(r, dest_client, i, total)
- def settings
- @client.indices.get_settings
+ while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and
+ (not r['hits']['hits'].empty?) do
+ i += bulk_results(r, dest_client, i, total)
+ end
end
- def warmer
- @client.indices.get_warmer index: @index
- end
+ private
- def refresh
- @client.indices.refresh index: @index
+ def bulk_results(results, dest_client, before_size, total)
+ bulk_message = convert_results(results)
+ connect_with_retry do
+ dest_client.bulk body: bulk_message unless bulk_message.empty?
+ to_size = before_size + (bulk_message.size/2)
+ @logger.info "copy complete (#{before_size}-#{to_size})/#{total}"
+ end
+ return (bulk_message.size/2)
end
- def nodes_info
- @client.nodes.info
+ def get_scroll_option(index, query, size, scroll)
+ body = {}
+ body[:query] = { query_string: { query: query } } unless query.nil?
+ search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) }
+ search_option
end
- def nodes_stats
- @client.nodes.stats
- end
-
- def bulk(timestamp_key, type, add_timestamp, id_keys, index)
- data = parse_json(STDIN.read)
- template = id_keys.map { |key| '%s' }.join('_') unless id_keys.nil?
+ def convert_results(search_results)
+ data = HashWrapper.new(search_results)
+ docs = data.hits.hits
bulk_message = []
- data.each do |record|
- if timestamp_key.nil?
- timestamp = Time.now.to_datetime.to_s
- else
- timestamp = record[timestamp_key].to_time.to_datetime.to_s
+ docs.each do |doc|
+ source = doc.delete('_source')
+ doc.delete('_score')
+ ['_id', '_type', '_index'].each do |meta_field|
+ source.delete(meta_field)
end
- record.merge!('@timestamp' => timestamp) if add_timestamp
- meta = { index: { _index: index, _type: type } }
- meta[:index][:_id] = generate_id(template, record, id_keys) unless id_keys.nil?
- bulk_message << meta
- bulk_message << record
+ bulk_message << { index: doc.to_h }
+ bulk_message << source
end
- bulk_message.each_slice(10000).each do |block|
- send_with_retry(@client, block)
- end
+ bulk_message
end
- private
-
- def send_with_retry(client, bulk_message, retry_on_failure = 5)
+ def connect_with_retry(retry_on_failure = 5)
retries = 0
begin
- client.bulk body: bulk_message unless bulk_message.empty?
+ yield if block_given?
rescue => e
if retries < retry_on_failure
retries += 1
- @logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
+ @logger.warn "Could not connect to Elasticsearch, resetting connection and trying again. #{e.message}"
sleep 2**retries
retry
end
- raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
+ raise "Could not connect to Elasticsearch after #{retries} retries. #{e.message}"
end
end
-
- def parse_json(buffer)
- begin
- data = Yajl::Parser.parse(buffer)
- rescue => e
- data = []
- buffer.split("\n").each do |line|
- data << Yajl::Parser.parse(line)
- end
- end
- data.class == Array ? data : [data]
- end
-
- def generate_id(template, record, id_keys)
- template % id_keys.map { |key| record[key] }
- end
-
- def get_sources(results)
- results.hits.hits.map { |result| result._source }
- end
-
end
end