lib/searchkick/index.rb in searchkick-1.3.3 vs lib/searchkick/index.rb in searchkick-1.3.4

- old
+ new

@@ -138,67 +138,102 @@ index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options) index.create(index_options) index end - # remove old indices that start w/ index_name - def clean_indices - all_indices = + def all_indices(options = {}) + indices = begin client.indices.get_aliases rescue Elasticsearch::Transport::Transport::Errors::NotFound {} end - indices = all_indices.select { |k, v| (v.empty? || v["aliases"].empty?) && k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys + indices = indices.select { |k, v| v.empty? || v["aliases"].empty? } if options[:unaliased] + indices.select { |k, v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys + end + + # remove old indices that start w/ index_name + def clean_indices + indices = all_indices(unaliased: true) indices.each do |index| Searchkick::Index.new(index).delete end indices end + def total_docs + response = + client.search( + index: name, + body: { + fields: [], + query: {match_all: {}}, + size: 0 + } + ) + + response["hits"]["total"] + end + # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ def reindex_scope(scope, options = {}) skip_import = options[:import] == false + resume = options[:resume] - clean_indices + if resume + index_name = all_indices.sort.last + raise Searchkick::Error, "No index to resume" unless index_name + index = Searchkick::Index.new(index_name) + else + clean_indices - index = create_index(index_options: scope.searchkick_index_options) + index = create_index(index_options: scope.searchkick_index_options) + end # check if alias exists if alias_exists? # import before swap - index.import_scope(scope) unless skip_import + index.import_scope(scope, resume: resume) unless skip_import # get existing indices to remove swap(index.name) clean_indices else delete if exists? swap(index.name) # import after swap - index.import_scope(scope) unless skip_import + index.import_scope(scope, resume: resume) unless skip_import end index.refresh true end - def import_scope(scope) + def import_scope(scope, options = {}) batch_size = @options[:batch_size] || 1000 # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) if scope.respond_to?(:find_in_batches) + if options[:resume] + # use total docs instead of max id since there's not a great way + # to get the max _id without scripting since it's a string + + # TODO use primary key and prefix with table name + scope = scope.where("id > ?", total_docs) + end + scope.find_in_batches batch_size: batch_size do |batch| import batch.select(&:should_index?) end else # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb # use cursor for Mongoid items = [] + # TODO add resume scope.all.each do |item| items << item if item.should_index? if items.length == batch_size import items items = []