lib/ej/core.rb in ej-0.0.7 vs lib/ej/core.rb in ej-0.0.8

- old
+ new

@@ -6,13 +6,13 @@ require 'hashie' require 'pp' module Ej class Core + DEFAULT_PER = 30000 def initialize(host, index, debug) - @logger = Logger.new($stderr) - @logger.level = debug ? Logger::DEBUG : Logger::INFO + @logger = debug ? Logger.new($stderr) : nil @index = index @client = Elasticsearch::Client.new hosts: host, logger: @logger, index: @index end def search(type, query, size, from, source_only, routing = nil) @@ -30,43 +30,48 @@ body[:query] = { query_string: { query: query } } unless query.nil? @client.search index: @index, type: type, body: body end def move(source, dest, query) - per = 30000 - source_client = Elasticsearch::Client.new hosts: source, index: @index, logger: @logger - dest_client = Elasticsearch::Client.new hosts: dest, logger: @logger + per = DEFAULT_PER num = 0 + logger = Logger.new($stdout) + source_client = Elasticsearch::Client.new hosts: source, index: @index + dest_client = Elasticsearch::Client.new hosts: dest while true from = num * per body = { size: per, from: from } body[:query] = { query_string: { query: query } } unless query.nil? data = Hashie::Mash.new(source_client.search index: @index, body: body) - break if data.hits.hits.empty? + docs = data.hits.hits + total = data.hits.total + break if docs.empty? bulk_message = [] - data.hits.hits.each do |doc| + docs.each do |doc| source = doc.delete('_source') doc.delete('_score') bulk_message << { index: doc.to_h } bulk_message << source end dest_client.bulk body: bulk_message unless bulk_message.empty? + logger.info("move complete #{from + docs.size}/#{total}") num += 1 end end def dump(query) - per = 30000 + per = DEFAULT_PER num = 0 bulk_message = [] while true from = num * per body = { size: per, from: from } body[:query] = { query_string: { query: query } } unless query.nil? data = Hashie::Mash.new(@client.search index: @index, body: body) - break if data.hits.hits.empty? - data.hits.hits.each do |doc| + docs = data.hits.hits + break if docs.empty? + docs.each do |doc| source = doc.delete('_source') doc.delete('_score') bulk_message << Yajl::Encoder.encode({ 'index' => doc.to_h }) bulk_message << Yajl::Encoder.encode(source) end @@ -86,9 +91,21 @@ {"query"=> {"bool"=> {"should"=>[{"query_string"=>{"query"=>query}}]}}, "filter"=>{"bool"=>{"must"=>[{"match_all"=>{}}]}}}}}}}}, "size"=>0} + @client.search index: @index, body: body + end + + def aggs(term, size, query) + body = {"size"=>0, + "query"=> + {"filtered"=> + {"query"=>{"query_string"=>{"query"=> query}}, + "filter"=>{"bool"=>{"must"=>[], "must_not"=>[]}}}}, + "aggs"=> + {"agg_" + term => + {"terms"=>{"field"=>term, "size"=>size, "order"=>{"_count"=>"desc"}}}}} @client.search index: @index, body: body end def min(term) body = {