lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.1.0 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.2.0
- old
+ new
@@ -20,19 +20,34 @@
"limit_size" => config.param("limit_size", :integer, default: nil),
"fields" => config.param("fields", :array, default: nil),
"queries" => config.param("queries", :array),
"sort" => config.param("sort", :hash, default: nil)
}
+ # TODO: want max_threads
+ define_num_threads = config.param("num_threads", :integer, default: 1)
+ task['slice_queries'] = get_slice_from_num_threads(task['queries'], define_num_threads)
columns = []
task['fields'].each_with_index{ |field, i|
columns << Column.new(i, field['name'], field['type'].to_sym)
}
- resume(task, columns, 1, &control)
+ resume(task, columns, task['slice_queries'].size, &control)
end
+ def self.get_slice_from_num_threads(array, define_num_threads)
+ num_threads = array.size < define_num_threads ? array.size : define_num_threads
+ per_queries = if (array.size % num_threads) == 0
+ (array.size / num_threads)
+ else
+ (array.size / num_threads) + 1
+ end
+ sliced = array.each_slice(per_queries).to_a
+ Embulk.logger.info("calculate num threads => #{sliced.size}")
+ return sliced
+ end
+
def self.resume(task, columns, count, &control)
task_reports = yield(task, columns, count)
next_config_diff = {}
return next_config_diff
@@ -55,14 +70,15 @@
::Elasticsearch::Client.new transport: transport
end
def init
+ @queries = task['slice_queries'][@index]
+ Embulk.logger.info("this thread queries => #{@queries}")
@client = self.class.create_client(task)
- @index = task['index']
+ @index_name = task['index']
@index_type = task['index_type']
- @queries = task['queries']
@per_size = task['per_size']
@limit_size = task['limit_size']
@fields = task['fields']
@sort = task['sort']
end
@@ -146,10 +162,10 @@
sorts << { k => v }
end
body[:sort] = sorts
end
body[:query] = { query_string: { query: query } } unless query.nil?
- search_option = { index: @index, type: type, body: body }
+ search_option = { index: @index_name, type: type, body: body }
search_option[:routing] = routing unless routing.nil?
search_option[:_source] = fields.select{ |field| !field['metadata'] }.map { |field| field['name'] }.join(',')
Embulk.logger.info(%Q{search_option => #{search_option}})
@client.search(search_option)
end