lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.2 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.3

- old
+ new

@@ -1,9 +1,8 @@ -require 'excon' -require 'elasticsearch' require_relative 'elasticsearch/connection' require_relative 'elasticsearch/input_thread' +require_relative 'elasticsearch/converter' module Embulk module Input class Elasticsearch < InputPlugin @@ -50,137 +49,26 @@ end def init @queries = task['slice_queries'][@index] Embulk.logger.info("this thread queries => #{@queries}") - @client = Connection.create_client(task) - @index_name = task['index'] - @index_type = task['index_type'] - @per_size = task['per_size'] - @limit_size = task['limit_size'] - @fields = task['fields'] - @sort = task['sort'] @add_query_to_record = task['add_query_to_record'] - @scroll = task['scroll'] - @retry_on_failure = task['retry_on_failure'] + @connection = Connection.new(task) end def run - search(@index_type, @per_size, @routing, @fields, @sort) + @queries.each do |query| + @connection.search_with_query(query) { |result| + if @add_query_to_record + result << query + end + page_builder.add(result) + } + end page_builder.finish task_report = {} return task_report - end - - private - - def search(type, size, routing, fields, sort) - @queries.each do |query| - search_with_query(query, type, size, routing, fields, sort) - end - end - - def search_with_query(query, type, size, routing, fields, sort) - search_option = get_search_option(type, query, size, fields, sort) - Embulk.logger.info("#{search_option}") - r = search_with_retry { @client.search(search_option) } - i = 0 - get_sources(r, fields).each do |result| - result_proc(result, query) - return if @limit_size == (i += 1) - end - - while r = (search_with_retry { @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) }) and (not r['hits']['hits'].empty?) do - get_sources(r, fields).each do |result| - result_proc(result, query) - return if @limit_size == (i += 1) - end - end - end - - def search_with_retry - retries = 0 - begin - yield if block_given? - rescue => e - if retries < @retry_on_failure - retries += 1 - Embulk.logger.warn "Could not search to Elasticsearch, resetting connection and trying again. #{e.message}" - sleep 2**retries - retry - end - Embulk.logger.error "Could not search to Elasticsearch after #{retries} retries. #{e.message}" - raise - end - end - - def result_proc(result, query) - if @add_query_to_record - result << query - end - page_builder.add(result) - end - - def get_search_option(type, query, size, fields, sort) - body = { } - body[:query] = { query_string: { query: query } } unless query.nil? - if sort - sorts = [] - sort.each do |k, v| - sorts << { k => v } - end - body[:sort] = sorts - else - body[:sort] = ["_doc"] - end - search_option = { index: @index_name, type: type, scroll: @scroll, body: body, size: size } - search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',') - search_option - end - - def get_sources(results, fields) - hits = results['hits']['hits'] - hits.map { |hit| - result = hit['_source'] - fields.select{ |field| field['metadata'] }.each { |field| - result[field['name']] = hit[field['name']] - } - @fields.map { |field| - convert_value(result[field['name']], field) - } - } - end - - def convert_value(value, field) - return nil if value.nil? - case field["type"] - when "string" - value - when "long" - value.to_i - when "double" - value.to_f - when "boolean" - if value.is_a?(TrueClass) || value.is_a?(FalseClass) - value - else - downcased_val = value.downcase - case downcased_val - when 'true' then true - when 'false' then false - when '1' then true - when '0' then false - else nil - end - end - when "timestamp" - Time.parse(value) - when "json" - value - else - raise "Unsupported type #{field['type']}" - end end end end end