lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.1 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.0

- old
+ new

@@ -1,13 +1,16 @@ require 'excon' require 'elasticsearch' +require_relative 'elasticsearch/connection' +require_relative 'elasticsearch/input_thread' module Embulk module Input class Elasticsearch < InputPlugin Plugin.register_input("elasticsearch", self) + ADD_QUERY_TO_RECORD_KEY = 'query' def self.transaction(config, &control) task = { "nodes" => config.param("nodes", :array), "request_timeout" => config.param("request_timeout", :integer, default: 60), @@ -23,162 +26,99 @@ "sort" => config.param("sort", :hash, default: nil), "add_query_to_record" => config.param("add_query_to_record", :bool, default: false) } # TODO: want max_threads define_num_threads = config.param("num_threads", :integer, default: 1) - task['slice_queries'] = get_slice_from_num_threads(task['queries'], define_num_threads) + task['slice_queries'] = InputThread.get_slice_from_num_threads(task['queries'], define_num_threads) columns = [] task['fields'].each_with_index{ |field, i| columns << Column.new(i, field['name'], field['type'].to_sym) } if task['add_query_to_record'] - columns << Column.new(task['fields'].size, "query", :string) + columns << Column.new(task['fields'].size, ADD_QUERY_TO_RECORD_KEY, :string) end resume(task, columns, task['slice_queries'].size, &control) end - def self.get_slice_from_num_threads(array, define_num_threads) - num_threads = array.size < define_num_threads ? array.size : define_num_threads - per_queries = if (array.size % num_threads) == 0 - (array.size / num_threads) - else - (array.size / num_threads) + 1 - end - sliced = array.each_slice(per_queries).to_a - Embulk.logger.info("calculate num threads => #{sliced.size}") - return sliced - end - def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} return next_config_diff end - def self.create_client(task) - transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new( - { - hosts: task['nodes'].map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] }, - options: { - reload_connections: task['reload_connections'], - reload_on_failure: task['reload_on_failure'], - retry_on_failure: task['retry_on_failure'], - transport_options: { - request: { timeout: task['request_timeout'] } - } - } - } - ) - - ::Elasticsearch::Client.new transport: transport - end - def init @queries = task['slice_queries'][@index] Embulk.logger.info("this thread queries => #{@queries}") - @client = self.class.create_client(task) + @client = Connection.create_client(task) @index_name = task['index'] @index_type = task['index_type'] @per_size = task['per_size'] @limit_size = task['limit_size'] @fields = task['fields'] @sort = task['sort'] @add_query_to_record = task['add_query_to_record'] end def run - @queries.each do |query| - query_count = 0 - no_source_results = search(@index_type, query, 0, 0, @routing, @fields, @sort) - total_count = [no_source_results['hits']['total'], @limit_size].compact.min - while true - now_results_size = query_count * @per_size - next_results_size = (query_count + 1) * @per_size - size = get_size(next_results_size, now_results_size ,total_count) - break if size == 0 - - results = get_sources(search(@index_type, query, size, now_results_size, @routing, @fields, @sort), @fields) - results.each do |record| - if @add_query_to_record - record << query - end - page_builder.add(record) - end - break if last_query?(next_results_size ,total_count) - query_count += 1 - end - end + search(@index_type, @per_size, @routing, @fields, @sort) page_builder.finish task_report = {} return task_report end private - def convert_value(value, field) - return nil if value.nil? - case field["type"] - when "string" - value - when "long" - value.to_i - when "double" - value.to_f - when "boolean" - if value.is_a?(TrueClass) || value.is_a?(FalseClass) - value - else - downcased_val = value.downcase - case downcased_val - when 'true' then true - when 'false' then false - when '1' then true - when '0' then false - else nil - end - end - when "timestamp" - Time.parse(value) - when "json" - value - else - raise "Unsupported type #{field['type']}" + def search(type, size, routing, fields, sort) + @queries.each do |query| + search_with_query(query, type, size, routing, fields, sort) end end - def get_size(next_results_size, now_results_size ,total_count) - if last_query?(next_results_size ,total_count) - (total_count - now_results_size) - else - @per_size + def search_with_query(query, type, size, routing, fields, sort) + search_option = get_search_option(type, query, size, fields, sort) + Embulk.logger.info("#{search_option}") + r = @client.search(search_option) + i = 0 + get_sources(r, fields).each do |result| + result_proc(result, query) + return if @limit_size == (i += 1) end + + while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: '1m') and (not r['hits']['hits'].empty?) do + get_sources(r, fields).each do |result| + result_proc(result, query) + return if @limit_size == (i += 1) + end + end end - def last_query?(next_results_size ,total_count) - next_results_size > total_count + def result_proc(result, query) + if @add_query_to_record + result << query + end + page_builder.add(result) end - def search(type, query, size, from, routing, fields, sort) - body = { from: from } - body[:size] = size unless size.nil? + def get_search_option(type, query, size, fields, sort) + body = { } + body[:query] = { query_string: { query: query } } unless query.nil? if sort sorts = [] sort.each do |k, v| sorts << { k => v } end body[:sort] = sorts + else + body[:sort] = ["_doc"] end - body[:query] = { query_string: { query: query } } unless query.nil? - search_option = { index: @index_name, type: type, body: body } - search_option[:routing] = routing unless routing.nil? + search_option = { index: @index_name, type: type, scroll: '1m', body: body, size: size } search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',') - Embulk.logger.info(%Q{search_option => #{search_option}}) - @client.search(search_option) + search_option end def get_sources(results, fields) hits = results['hits']['hits'] hits.map { |hit| @@ -188,9 +128,40 @@ } @fields.map { |field| convert_value(result[field['name']], field) } } + end + + def convert_value(value, field) + return nil if value.nil? + case field["type"] + when "string" + value + when "long" + value.to_i + when "double" + value.to_f + when "boolean" + if value.is_a?(TrueClass) || value.is_a?(FalseClass) + value + else + downcased_val = value.downcase + case downcased_val + when 'true' then true + when 'false' then false + when '1' then true + when '0' then false + else nil + end + end + when "timestamp" + Time.parse(value) + when "json" + value + else + raise "Unsupported type #{field['type']}" + end end end end end