lib/searchkick/index.rb in searchkick-2.0.2 vs lib/searchkick/index.rb in searchkick-2.0.3
- old
+ new
@@ -36,10 +36,14 @@
def settings
client.indices.get_settings index: name
end
+ def update_settings(settings)
+ client.indices.put_settings index: name, body: settings
+ end
+
def promote(new_name)
old_indices =
begin
client.indices.get_alias(name: name).keys
rescue Elasticsearch::Transport::Transport::Errors::NotFound
@@ -185,11 +189,11 @@
response["hits"]["total"]
end
# https://gist.github.com/jarosan/3124884
# http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
- def reindex_scope(scope, import: true, resume: false, retain: false)
+ def reindex_scope(scope, import: true, resume: false, retain: false, async: false)
if resume
index_name = all_indices.sort.last
raise Searchkick::Error, "No index to resume" unless index_name
index = Searchkick::Index.new(index_name)
else
@@ -199,72 +203,93 @@
end
# check if alias exists
if alias_exists?
# import before promotion
- index.import_scope(scope, resume: resume) if import
+ index.import_scope(scope, resume: resume, async: async, full: true) if import
# get existing indices to remove
- promote(index.name)
- clean_indices unless retain
+ unless async
+ promote(index.name)
+ clean_indices unless retain
+ end
else
delete if exists?
promote(index.name)
# import after promotion
- index.import_scope(scope, resume: resume) if import
+ index.import_scope(scope, resume: resume, async: async, full: true) if import
end
- index.refresh
-
- true
+ if async
+ {index_name: index.name}
+ else
+ index.refresh
+ true
+ end
end
- def import_scope(scope, resume: false, method_name: nil)
+ def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
batch_size = @options[:batch_size] || 1000
# use scope for import
scope = scope.search_import if scope.respond_to?(:search_import)
- if scope.respond_to?(:find_in_batches)
+
+ if batch
+ import_or_update scope.to_a, method_name, async
+ Searchkick.redis.srem(batches_key, batch_id) if batch_id && Searchkick.redis
+ elsif full && async
+ # TODO expire Redis key
+ primary_key = scope.primary_key
+ starting_id = scope.minimum(primary_key)
+ max_id = scope.maximum(primary_key)
+ batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil
+
+ batches_count.times do |i|
+ batch_id = i + 1
+ min_id = starting_id + (i * batch_size)
+ Searchkick::BulkReindexJob.perform_later(
+ class_name: scope.model_name.name,
+ min_id: min_id,
+ max_id: min_id + batch_size - 1,
+ index_name: name,
+ batch_id: batch_id
+ )
+ Searchkick.redis.sadd(batches_key, batch_id) if Searchkick.redis
+ end
+ elsif scope.respond_to?(:find_in_batches)
if resume
# use total docs instead of max id since there's not a great way
# to get the max _id without scripting since it's a string
# TODO use primary key and prefix with table name
scope = scope.where("id > ?", total_docs)
end
+ scope = scope.select("id").except(:includes, :preload) if async
+
scope.find_in_batches batch_size: batch_size do |batch|
- import_or_update batch.select(&:should_index?), method_name
+ import_or_update batch, method_name, async
end
else
# https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
# use cursor for Mongoid
items = []
# TODO add resume
scope.all.each do |item|
- items << item if item.should_index?
+ items << item
if items.length == batch_size
- import_or_update items, method_name
+ import_or_update items, method_name, async
items = []
end
end
- import_or_update items, method_name
+ import_or_update items, method_name, async
end
end
- def import_or_update(records, method_name)
- retries = 0
- begin
- method_name ? bulk_update(records, method_name) : import(records)
- rescue Faraday::ClientError => e
- if retries < 1
- retries += 1
- retry
- end
- raise e
- end
+ def batches_left
+ Searchkick.redis.scard(batches_key) if Searchkick.redis
end
# other
def tokens(text, options = {})
@@ -370,8 +395,37 @@
cast_big_decimal(v)
end
else
obj
end
+ end
+
+ def import_or_update(records, method_name, async)
+ if records.any?
+ if async
+ Searchkick::BulkReindexJob.perform_later(
+ class_name: records.first.class.name,
+ record_ids: records.map(&:id),
+ index_name: name,
+ method_name: method_name ? method_name.to_s : nil
+ )
+ else
+ retries = 0
+ records = records.select(&:should_index?)
+ begin
+ method_name ? bulk_update(records, method_name) : import(records)
+ rescue Faraday::ClientError => e
+ if retries < 1
+ retries += 1
+ retry
+ end
+ raise e
+ end
+ end
+ end
+ end
+
+ def batches_key
+ "searchkick:reindex:#{name}:batches"
end
end
end