lib/esse/import/bulk.rb in esse-0.2.6 vs lib/esse/import/bulk.rb in esse-0.3.0
- old
+ new
@@ -1,35 +1,40 @@
module Esse
module Import
class Bulk
- def initialize(type: nil, index: nil, delete: nil, create: nil)
+ def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil)
@index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
{ index: value }
end
@create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
{ create: value }
end
+ @update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
+ value = doc.to_bulk(operation: :update)
+ value[:_type] ||= type if type
+ { update: value }
+ end
@delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc|
value = doc.to_bulk(data: false)
value[:_type] ||= type if type
{ delete: value }
end
end
# Return an array of RequestBody instances
#
# In case of timeout error, will retry with an exponential backoff using the following formula:
- # wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 3.
+ # wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4.
#
# Too large bulk requests will be split into multiple requests with only one attempt.
#
# @yield [RequestBody] A request body instance
- def each_request(max_retries: 3)
+ def each_request(max_retries: 4, last_retry_in_small_chunks: true)
# @TODO create indexes when by checking all the index suffixes (if mapping is not empty)
requests = [optimistic_request]
retry_count = 0
begin
@@ -41,10 +46,12 @@
end
end
rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e
retry_count += 1
raise Esse::Transport::RequestTimeoutError.new(e.message) if retry_count >= max_retries
+ # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt
+ requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2
wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1))
Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds"
sleep(wait_interval)
retry
rescue Esse::Transport::RequestEntityTooLargeError => e
@@ -65,19 +72,33 @@
Esse.document?(doc)
end
def optimistic_request
request = Import::RequestBodyAsJson.new
- request.delete = @delete
request.create = @create
request.index = @index
+ request.update = @update
+ request.delete = @delete
request
end
+ def requests_in_small_chunks(chunk_size: 1)
+ arr = []
+ @create.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.create = slice } }
+ @index.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.index = slice } }
+ @update.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.update = slice } }
+ @delete.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.delete = slice } }
+ Esse.logger.warn <<~MSG
+ Retrying the last request in small chunks of #{chunk_size} documents.
+ This is a last resort to avoid timeout errors, consider increasing the bulk size or reducing the batch size.
+ MSG
+ arr
+ end
+
# @return [Array<RequestBody>]
def balance_requests_size(err)
if (bulk_size = err.message.scan(/exceeded.(\d+).bytes/).dig(0, 0).to_i) > 0
- requests = (@delete + @create + @index).each_with_object([Import::RequestBodyRaw.new]) do |as_json, result|
+ requests = (@create + @index + @update + @delete).each_with_object([Import::RequestBodyRaw.new]) do |as_json, result|
operation, meta = as_json.to_a.first
meta = meta.dup
data = meta.delete(:data)
piece = MultiJson.dump(operation => meta)
piece << "\n" << MultiJson.dump(data) if data