lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.2 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.3
- old
+ new
@@ -1,9 +1,8 @@
-require 'excon'
-require 'elasticsearch'
require_relative 'elasticsearch/connection'
require_relative 'elasticsearch/input_thread'
+require_relative 'elasticsearch/converter'
module Embulk
module Input
class Elasticsearch < InputPlugin
@@ -50,137 +49,26 @@
end
def init
@queries = task['slice_queries'][@index]
Embulk.logger.info("this thread queries => #{@queries}")
- @client = Connection.create_client(task)
- @index_name = task['index']
- @index_type = task['index_type']
- @per_size = task['per_size']
- @limit_size = task['limit_size']
- @fields = task['fields']
- @sort = task['sort']
@add_query_to_record = task['add_query_to_record']
- @scroll = task['scroll']
- @retry_on_failure = task['retry_on_failure']
+ @connection = Connection.new(task)
end
def run
- search(@index_type, @per_size, @routing, @fields, @sort)
+ @queries.each do |query|
+ @connection.search_with_query(query) { |result|
+ if @add_query_to_record
+ result << query
+ end
+ page_builder.add(result)
+ }
+ end
page_builder.finish
task_report = {}
return task_report
- end
-
- private
-
- def search(type, size, routing, fields, sort)
- @queries.each do |query|
- search_with_query(query, type, size, routing, fields, sort)
- end
- end
-
- def search_with_query(query, type, size, routing, fields, sort)
- search_option = get_search_option(type, query, size, fields, sort)
- Embulk.logger.info("#{search_option}")
- r = search_with_retry { @client.search(search_option) }
- i = 0
- get_sources(r, fields).each do |result|
- result_proc(result, query)
- return if @limit_size == (i += 1)
- end
-
- while r = (search_with_retry { @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) }) and (not r['hits']['hits'].empty?) do
- get_sources(r, fields).each do |result|
- result_proc(result, query)
- return if @limit_size == (i += 1)
- end
- end
- end
-
- def search_with_retry
- retries = 0
- begin
- yield if block_given?
- rescue => e
- if retries < @retry_on_failure
- retries += 1
- Embulk.logger.warn "Could not search to Elasticsearch, resetting connection and trying again. #{e.message}"
- sleep 2**retries
- retry
- end
- Embulk.logger.error "Could not search to Elasticsearch after #{retries} retries. #{e.message}"
- raise
- end
- end
-
- def result_proc(result, query)
- if @add_query_to_record
- result << query
- end
- page_builder.add(result)
- end
-
- def get_search_option(type, query, size, fields, sort)
- body = { }
- body[:query] = { query_string: { query: query } } unless query.nil?
- if sort
- sorts = []
- sort.each do |k, v|
- sorts << { k => v }
- end
- body[:sort] = sorts
- else
- body[:sort] = ["_doc"]
- end
- search_option = { index: @index_name, type: type, scroll: @scroll, body: body, size: size }
- search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',')
- search_option
- end
-
- def get_sources(results, fields)
- hits = results['hits']['hits']
- hits.map { |hit|
- result = hit['_source']
- fields.select{ |field| field['metadata'] }.each { |field|
- result[field['name']] = hit[field['name']]
- }
- @fields.map { |field|
- convert_value(result[field['name']], field)
- }
- }
- end
-
- def convert_value(value, field)
- return nil if value.nil?
- case field["type"]
- when "string"
- value
- when "long"
- value.to_i
- when "double"
- value.to_f
- when "boolean"
- if value.is_a?(TrueClass) || value.is_a?(FalseClass)
- value
- else
- downcased_val = value.downcase
- case downcased_val
- when 'true' then true
- when 'false' then false
- when '1' then true
- when '0' then false
- else nil
- end
- end
- when "timestamp"
- Time.parse(value)
- when "json"
- value
- else
- raise "Unsupported type #{field['type']}"
- end
end
end
end
end