lib/searchkick/index.rb in searchkick-2.0.2 vs lib/searchkick/index.rb in searchkick-2.0.3

- old
+ new

@@ -36,10 +36,14 @@ def settings client.indices.get_settings index: name end + def update_settings(settings) + client.indices.put_settings index: name, body: settings + end + def promote(new_name) old_indices = begin client.indices.get_alias(name: name).keys rescue Elasticsearch::Transport::Transport::Errors::NotFound @@ -185,11 +189,11 @@ response["hits"]["total"] end # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ - def reindex_scope(scope, import: true, resume: false, retain: false) + def reindex_scope(scope, import: true, resume: false, retain: false, async: false) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name index = Searchkick::Index.new(index_name) else @@ -199,72 +203,93 @@ end # check if alias exists if alias_exists? # import before promotion - index.import_scope(scope, resume: resume) if import + index.import_scope(scope, resume: resume, async: async, full: true) if import # get existing indices to remove - promote(index.name) - clean_indices unless retain + unless async + promote(index.name) + clean_indices unless retain + end else delete if exists? promote(index.name) # import after promotion - index.import_scope(scope, resume: resume) if import + index.import_scope(scope, resume: resume, async: async, full: true) if import end - index.refresh - - true + if async + {index_name: index.name} + else + index.refresh + true + end end - def import_scope(scope, resume: false, method_name: nil) + def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) batch_size = @options[:batch_size] || 1000 # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) - if scope.respond_to?(:find_in_batches) + + if batch + import_or_update scope.to_a, method_name, async + Searchkick.redis.srem(batches_key, batch_id) if batch_id && Searchkick.redis + elsif full && async + # TODO expire Redis key + primary_key = scope.primary_key + starting_id = scope.minimum(primary_key) + max_id = scope.maximum(primary_key) + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil + + batches_count.times do |i| + batch_id = i + 1 + min_id = starting_id + (i * batch_size) + Searchkick::BulkReindexJob.perform_later( + class_name: scope.model_name.name, + min_id: min_id, + max_id: min_id + batch_size - 1, + index_name: name, + batch_id: batch_id + ) + Searchkick.redis.sadd(batches_key, batch_id) if Searchkick.redis + end + elsif scope.respond_to?(:find_in_batches) if resume # use total docs instead of max id since there's not a great way # to get the max _id without scripting since it's a string # TODO use primary key and prefix with table name scope = scope.where("id > ?", total_docs) end + scope = scope.select("id").except(:includes, :preload) if async + scope.find_in_batches batch_size: batch_size do |batch| - import_or_update batch.select(&:should_index?), method_name + import_or_update batch, method_name, async end else # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb # use cursor for Mongoid items = [] # TODO add resume scope.all.each do |item| - items << item if item.should_index? + items << item if items.length == batch_size - import_or_update items, method_name + import_or_update items, method_name, async items = [] end end - import_or_update items, method_name + import_or_update items, method_name, async end end - def import_or_update(records, method_name) - retries = 0 - begin - method_name ? bulk_update(records, method_name) : import(records) - rescue Faraday::ClientError => e - if retries < 1 - retries += 1 - retry - end - raise e - end + def batches_left + Searchkick.redis.scard(batches_key) if Searchkick.redis end # other def tokens(text, options = {}) @@ -370,8 +395,37 @@ cast_big_decimal(v) end else obj end + end + + def import_or_update(records, method_name, async) + if records.any? + if async + Searchkick::BulkReindexJob.perform_later( + class_name: records.first.class.name, + record_ids: records.map(&:id), + index_name: name, + method_name: method_name ? method_name.to_s : nil + ) + else + retries = 0 + records = records.select(&:should_index?) + begin + method_name ? bulk_update(records, method_name) : import(records) + rescue Faraday::ClientError => e + if retries < 1 + retries += 1 + retry + end + raise e + end + end + end + end + + def batches_key + "searchkick:reindex:#{name}:batches" end end end