lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.1 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.0
- old
+ new
@@ -1,13 +1,16 @@
require 'excon'
require 'elasticsearch'
+require_relative 'elasticsearch/connection'
+require_relative 'elasticsearch/input_thread'
module Embulk
module Input
class Elasticsearch < InputPlugin
Plugin.register_input("elasticsearch", self)
+ ADD_QUERY_TO_RECORD_KEY = 'query'
def self.transaction(config, &control)
task = {
"nodes" => config.param("nodes", :array),
"request_timeout" => config.param("request_timeout", :integer, default: 60),
@@ -23,162 +26,99 @@
"sort" => config.param("sort", :hash, default: nil),
"add_query_to_record" => config.param("add_query_to_record", :bool, default: false)
}
# TODO: want max_threads
define_num_threads = config.param("num_threads", :integer, default: 1)
- task['slice_queries'] = get_slice_from_num_threads(task['queries'], define_num_threads)
+ task['slice_queries'] = InputThread.get_slice_from_num_threads(task['queries'], define_num_threads)
columns = []
task['fields'].each_with_index{ |field, i|
columns << Column.new(i, field['name'], field['type'].to_sym)
}
if task['add_query_to_record']
- columns << Column.new(task['fields'].size, "query", :string)
+ columns << Column.new(task['fields'].size, ADD_QUERY_TO_RECORD_KEY, :string)
end
resume(task, columns, task['slice_queries'].size, &control)
end
- def self.get_slice_from_num_threads(array, define_num_threads)
- num_threads = array.size < define_num_threads ? array.size : define_num_threads
- per_queries = if (array.size % num_threads) == 0
- (array.size / num_threads)
- else
- (array.size / num_threads) + 1
- end
- sliced = array.each_slice(per_queries).to_a
- Embulk.logger.info("calculate num threads => #{sliced.size}")
- return sliced
- end
-
def self.resume(task, columns, count, &control)
task_reports = yield(task, columns, count)
next_config_diff = {}
return next_config_diff
end
- def self.create_client(task)
- transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new(
- {
- hosts: task['nodes'].map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] },
- options: {
- reload_connections: task['reload_connections'],
- reload_on_failure: task['reload_on_failure'],
- retry_on_failure: task['retry_on_failure'],
- transport_options: {
- request: { timeout: task['request_timeout'] }
- }
- }
- }
- )
-
- ::Elasticsearch::Client.new transport: transport
- end
-
def init
@queries = task['slice_queries'][@index]
Embulk.logger.info("this thread queries => #{@queries}")
- @client = self.class.create_client(task)
+ @client = Connection.create_client(task)
@index_name = task['index']
@index_type = task['index_type']
@per_size = task['per_size']
@limit_size = task['limit_size']
@fields = task['fields']
@sort = task['sort']
@add_query_to_record = task['add_query_to_record']
end
def run
- @queries.each do |query|
- query_count = 0
- no_source_results = search(@index_type, query, 0, 0, @routing, @fields, @sort)
- total_count = [no_source_results['hits']['total'], @limit_size].compact.min
- while true
- now_results_size = query_count * @per_size
- next_results_size = (query_count + 1) * @per_size
- size = get_size(next_results_size, now_results_size ,total_count)
- break if size == 0
-
- results = get_sources(search(@index_type, query, size, now_results_size, @routing, @fields, @sort), @fields)
- results.each do |record|
- if @add_query_to_record
- record << query
- end
- page_builder.add(record)
- end
- break if last_query?(next_results_size ,total_count)
- query_count += 1
- end
- end
+ search(@index_type, @per_size, @routing, @fields, @sort)
page_builder.finish
task_report = {}
return task_report
end
private
- def convert_value(value, field)
- return nil if value.nil?
- case field["type"]
- when "string"
- value
- when "long"
- value.to_i
- when "double"
- value.to_f
- when "boolean"
- if value.is_a?(TrueClass) || value.is_a?(FalseClass)
- value
- else
- downcased_val = value.downcase
- case downcased_val
- when 'true' then true
- when 'false' then false
- when '1' then true
- when '0' then false
- else nil
- end
- end
- when "timestamp"
- Time.parse(value)
- when "json"
- value
- else
- raise "Unsupported type #{field['type']}"
+ def search(type, size, routing, fields, sort)
+ @queries.each do |query|
+ search_with_query(query, type, size, routing, fields, sort)
end
end
- def get_size(next_results_size, now_results_size ,total_count)
- if last_query?(next_results_size ,total_count)
- (total_count - now_results_size)
- else
- @per_size
+ def search_with_query(query, type, size, routing, fields, sort)
+ search_option = get_search_option(type, query, size, fields, sort)
+ Embulk.logger.info("#{search_option}")
+ r = @client.search(search_option)
+ i = 0
+ get_sources(r, fields).each do |result|
+ result_proc(result, query)
+ return if @limit_size == (i += 1)
end
+
+ while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: '1m') and (not r['hits']['hits'].empty?) do
+ get_sources(r, fields).each do |result|
+ result_proc(result, query)
+ return if @limit_size == (i += 1)
+ end
+ end
end
- def last_query?(next_results_size ,total_count)
- next_results_size > total_count
+ def result_proc(result, query)
+ if @add_query_to_record
+ result << query
+ end
+ page_builder.add(result)
end
- def search(type, query, size, from, routing, fields, sort)
- body = { from: from }
- body[:size] = size unless size.nil?
+ def get_search_option(type, query, size, fields, sort)
+ body = { }
+ body[:query] = { query_string: { query: query } } unless query.nil?
if sort
sorts = []
sort.each do |k, v|
sorts << { k => v }
end
body[:sort] = sorts
+ else
+ body[:sort] = ["_doc"]
end
- body[:query] = { query_string: { query: query } } unless query.nil?
- search_option = { index: @index_name, type: type, body: body }
- search_option[:routing] = routing unless routing.nil?
+ search_option = { index: @index_name, type: type, scroll: '1m', body: body, size: size }
search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',')
- Embulk.logger.info(%Q{search_option => #{search_option}})
- @client.search(search_option)
+ search_option
end
def get_sources(results, fields)
hits = results['hits']['hits']
hits.map { |hit|
@@ -188,9 +128,40 @@
}
@fields.map { |field|
convert_value(result[field['name']], field)
}
}
+ end
+
+ def convert_value(value, field)
+ return nil if value.nil?
+ case field["type"]
+ when "string"
+ value
+ when "long"
+ value.to_i
+ when "double"
+ value.to_f
+ when "boolean"
+ if value.is_a?(TrueClass) || value.is_a?(FalseClass)
+ value
+ else
+ downcased_val = value.downcase
+ case downcased_val
+ when 'true' then true
+ when 'false' then false
+ when '1' then true
+ when '0' then false
+ else nil
+ end
+ end
+ when "timestamp"
+ Time.parse(value)
+ when "json"
+ value
+ else
+ raise "Unsupported type #{field['type']}"
+ end
end
end
end
end