lib/searchkick/index.rb in searchkick-1.3.3 vs lib/searchkick/index.rb in searchkick-1.3.4
- old
+ new
@@ -138,67 +138,102 @@
index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options)
index.create(index_options)
index
end
- # remove old indices that start w/ index_name
- def clean_indices
- all_indices =
+ def all_indices(options = {})
+ indices =
begin
client.indices.get_aliases
rescue Elasticsearch::Transport::Transport::Errors::NotFound
{}
end
- indices = all_indices.select { |k, v| (v.empty? || v["aliases"].empty?) && k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys
+ indices = indices.select { |k, v| v.empty? || v["aliases"].empty? } if options[:unaliased]
+ indices.select { |k, v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys
+ end
+
+ # remove old indices that start w/ index_name
+ def clean_indices
+ indices = all_indices(unaliased: true)
indices.each do |index|
Searchkick::Index.new(index).delete
end
indices
end
+ def total_docs
+ response =
+ client.search(
+ index: name,
+ body: {
+ fields: [],
+ query: {match_all: {}},
+ size: 0
+ }
+ )
+
+ response["hits"]["total"]
+ end
+
# https://gist.github.com/jarosan/3124884
# http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
def reindex_scope(scope, options = {})
skip_import = options[:import] == false
+ resume = options[:resume]
- clean_indices
+ if resume
+ index_name = all_indices.sort.last
+ raise Searchkick::Error, "No index to resume" unless index_name
+ index = Searchkick::Index.new(index_name)
+ else
+ clean_indices
- index = create_index(index_options: scope.searchkick_index_options)
+ index = create_index(index_options: scope.searchkick_index_options)
+ end
# check if alias exists
if alias_exists?
# import before swap
- index.import_scope(scope) unless skip_import
+ index.import_scope(scope, resume: resume) unless skip_import
# get existing indices to remove
swap(index.name)
clean_indices
else
delete if exists?
swap(index.name)
# import after swap
- index.import_scope(scope) unless skip_import
+ index.import_scope(scope, resume: resume) unless skip_import
end
index.refresh
true
end
- def import_scope(scope)
+ def import_scope(scope, options = {})
batch_size = @options[:batch_size] || 1000
# use scope for import
scope = scope.search_import if scope.respond_to?(:search_import)
if scope.respond_to?(:find_in_batches)
+ if options[:resume]
+ # use total docs instead of max id since there's not a great way
+ # to get the max _id without scripting since it's a string
+
+ # TODO use primary key and prefix with table name
+ scope = scope.where("id > ?", total_docs)
+ end
+
scope.find_in_batches batch_size: batch_size do |batch|
import batch.select(&:should_index?)
end
else
# https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
# use cursor for Mongoid
items = []
+ # TODO add resume
scope.all.each do |item|
items << item if item.should_index?
if items.length == batch_size
import items
items = []