lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.1.0 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.0

- old
+ new

@@ -20,19 +20,34 @@ "limit_size" => config.param("limit_size", :integer, default: nil), "fields" => config.param("fields", :array, default: nil), "queries" => config.param("queries", :array), "sort" => config.param("sort", :hash, default: nil) } + # TODO: want max_threads + define_num_threads = config.param("num_threads", :integer, default: 1) + task['slice_queries'] = get_slice_from_num_threads(task['queries'], define_num_threads) columns = [] task['fields'].each_with_index{ |field, i| columns << Column.new(i, field['name'], field['type'].to_sym) } - resume(task, columns, 1, &control) + resume(task, columns, task['slice_queries'].size, &control) end + def self.get_slice_from_num_threads(array, define_num_threads) + num_threads = array.size < define_num_threads ? array.size : define_num_threads + per_queries = if (array.size % num_threads) == 0 + (array.size / num_threads) + else + (array.size / num_threads) + 1 + end + sliced = array.each_slice(per_queries).to_a + Embulk.logger.info("calculate num threads => #{sliced.size}") + return sliced + end + def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} return next_config_diff @@ -55,14 +70,15 @@ ::Elasticsearch::Client.new transport: transport end def init + @queries = task['slice_queries'][@index] + Embulk.logger.info("this thread queries => #{@queries}") @client = self.class.create_client(task) - @index = task['index'] + @index_name = task['index'] @index_type = task['index_type'] - @queries = task['queries'] @per_size = task['per_size'] @limit_size = task['limit_size'] @fields = task['fields'] @sort = task['sort'] end @@ -146,10 +162,10 @@ sorts << { k => v } end body[:sort] = sorts end body[:query] = { query_string: { query: query } } unless query.nil? - search_option = { index: @index, type: type, body: body } + search_option = { index: @index_name, type: type, body: body } search_option[:routing] = routing unless routing.nil? search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',') Embulk.logger.info(%Q{search_option => #{search_option}}) @client.search(search_option) end