lib/search_flip/bulk.rb in search_flip-2.0.0.beta4 vs lib/search_flip/bulk.rb in search_flip-2.0.0.beta5
- old
+ new
@@ -12,11 +12,11 @@
# end
class Bulk
class Error < StandardError; end
- attr_accessor :url, :count, :options, :http_client, :ignore_errors
+ attr_reader :url, :options, :ignore_errors
# Builds and yields a new Bulk object, ie initiates the buffer, yields,
# sends batches of records each time the buffer is full, and sends a final
# batch after the yielded code returns and there are still documents
# present within the buffer.
@@ -30,28 +30,33 @@
# SearchFlip::Bulk.new "http://127.0.0.1:9200/index/type/_bulk", 1_000, ignore_errors: [409] do |bulk|
# # ...
# end
#
# @param url [String] The endpoint to send bulk requests to
- # @param count [Fixnum] The maximum number of documents per bulk request
# @param options [Hash] Options for the bulk requests
+ # @option options batch_size [Fixnum] The maximum number of documents per bulk
+ # request
# @option options ignore_errors [Array, Fixnum] Errors that should be
# ignored. If you eg want to ignore errors resulting from conflicts,
# you can specify to ignore 409 here.
# @option options raise [Boolean] If you want the bulk requests to never
# raise any exceptions (fire and forget), you can pass false here.
# Default is true.
# @option options http_client [SearchFlip::HTTPClient] An optional http
# client instance
- def initialize(url, count = 1_000, options = {})
- self.url = url
- self.count = count
- self.options = options
- self.http_client = options[:http_client] || SearchFlip::HTTPClient.new
- self.ignore_errors = Array(options[:ignore_errors]).to_set if options[:ignore_errors]
+ def initialize(url, options = {})
+ @url = url
+ @options = options
+ @http_client = options[:http_client] || SearchFlip::HTTPClient.new
+ @ignore_errors = Array(options[:ignore_errors]).to_set if options[:ignore_errors]
+ @bulk_limit = options[:bulk_limit] || SearchFlip::Config[:bulk_limit]
+ @bulk_max_mb = options[:bulk_max_mb] || SearchFlip::Config[:bulk_max_mb]
+
+ @bulk_max_bytes = @bulk_max_mb * 1024 * 1024
+
init
yield self
upload if @num > 0
@@ -115,11 +120,11 @@
@num = 0
end
def upload
response =
- http_client
+ @http_client
.headers(accept: "application/json", content_type: "application/x-ndjson")
.put(url, body: @payload, params: ignore_errors ? {} : { filter_path: "errors" })
return if options[:raise] == false
@@ -139,20 +144,24 @@
ensure
init
end
def perform(action, id, json = nil, options = {})
- @payload << SearchFlip::JSON.generate(action => options.merge(_id: id))
- @payload << "\n"
+ new_payload = SearchFlip::JSON.generate(action => options.merge(_id: id))
+ new_payload << "\n"
if json
- @payload << json
- @payload << "\n"
+ new_payload << json
+ new_payload << "\n"
end
+ upload if @num > 0 && @payload.bytesize + new_payload.bytesize >= @bulk_max_bytes
+
+ @payload << new_payload
+
@num += 1
- upload if @num >= count
+ upload if @num >= @bulk_limit || @payload.bytesize >= @bulk_max_bytes
end
end
end