lib/esse/import/bulk.rb in esse-0.2.6 vs lib/esse/import/bulk.rb in esse-0.3.0

- old
+ new

@@ -1,35 +1,40 @@ module Esse module Import class Bulk - def initialize(type: nil, index: nil, delete: nil, create: nil) + def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil) @index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type { index: value } end @create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type { create: value } end + @update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk(operation: :update) + value[:_type] ||= type if type + { update: value } + end @delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc| value = doc.to_bulk(data: false) value[:_type] ||= type if type { delete: value } end end # Return an array of RequestBody instances # # In case of timeout error, will retry with an exponential backoff using the following formula: - # wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 3. + # wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4. # # Too large bulk requests will be split into multiple requests with only one attempt. # # @yield [RequestBody] A request body instance - def each_request(max_retries: 3) + def each_request(max_retries: 4, last_retry_in_small_chunks: true) # @TODO create indexes when by checking all the index suffixes (if mapping is not empty) requests = [optimistic_request] retry_count = 0 begin @@ -41,10 +46,12 @@ end end rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e retry_count += 1 raise Esse::Transport::RequestTimeoutError.new(e.message) if retry_count >= max_retries + # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt + requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2 wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds" sleep(wait_interval) retry rescue Esse::Transport::RequestEntityTooLargeError => e @@ -65,19 +72,33 @@ Esse.document?(doc) end def optimistic_request request = Import::RequestBodyAsJson.new - request.delete = @delete request.create = @create request.index = @index + request.update = @update + request.delete = @delete request end + def requests_in_small_chunks(chunk_size: 1) + arr = [] + @create.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.create = slice } } + @index.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.index = slice } } + @update.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.update = slice } } + @delete.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.delete = slice } } + Esse.logger.warn <<~MSG + Retrying the last request in small chunks of #{chunk_size} documents. + This is a last resort to avoid timeout errors, consider increasing the bulk size or reducing the batch size. + MSG + arr + end + # @return [Array<RequestBody>] def balance_requests_size(err) if (bulk_size = err.message.scan(/exceeded.(\d+).bytes/).dig(0, 0).to_i) > 0 - requests = (@delete + @create + @index).each_with_object([Import::RequestBodyRaw.new]) do |as_json, result| + requests = (@create + @index + @update + @delete).each_with_object([Import::RequestBodyRaw.new]) do |as_json, result| operation, meta = as_json.to_a.first meta = meta.dup data = meta.delete(:data) piece = MultiJson.dump(operation => meta) piece << "\n" << MultiJson.dump(data) if data