lib/search_flip/bulk.rb in search_flip-2.0.0.beta4 vs lib/search_flip/bulk.rb in search_flip-2.0.0.beta5

- old
+ new

@@ -12,11 +12,11 @@ # end class Bulk class Error < StandardError; end - attr_accessor :url, :count, :options, :http_client, :ignore_errors + attr_reader :url, :options, :ignore_errors # Builds and yields a new Bulk object, ie initiates the buffer, yields, # sends batches of records each time the buffer is full, and sends a final # batch after the yielded code returns and there are still documents # present within the buffer. @@ -30,28 +30,33 @@ # SearchFlip::Bulk.new "http://127.0.0.1:9200/index/type/_bulk", 1_000, ignore_errors: [409] do |bulk| # # ... # end # # @param url [String] The endpoint to send bulk requests to - # @param count [Fixnum] The maximum number of documents per bulk request # @param options [Hash] Options for the bulk requests + # @option options batch_size [Fixnum] The maximum number of documents per bulk + # request # @option options ignore_errors [Array, Fixnum] Errors that should be # ignored. If you eg want to ignore errors resulting from conflicts, # you can specify to ignore 409 here. # @option options raise [Boolean] If you want the bulk requests to never # raise any exceptions (fire and forget), you can pass false here. # Default is true. # @option options http_client [SearchFlip::HTTPClient] An optional http # client instance - def initialize(url, count = 1_000, options = {}) - self.url = url - self.count = count - self.options = options - self.http_client = options[:http_client] || SearchFlip::HTTPClient.new - self.ignore_errors = Array(options[:ignore_errors]).to_set if options[:ignore_errors] + def initialize(url, options = {}) + @url = url + @options = options + @http_client = options[:http_client] || SearchFlip::HTTPClient.new + @ignore_errors = Array(options[:ignore_errors]).to_set if options[:ignore_errors] + @bulk_limit = options[:bulk_limit] || SearchFlip::Config[:bulk_limit] + @bulk_max_mb = options[:bulk_max_mb] || SearchFlip::Config[:bulk_max_mb] + + @bulk_max_bytes = @bulk_max_mb * 1024 * 1024 + init yield self upload if @num > 0 @@ -115,11 +120,11 @@ @num = 0 end def upload response = - http_client + @http_client .headers(accept: "application/json", content_type: "application/x-ndjson") .put(url, body: @payload, params: ignore_errors ? {} : { filter_path: "errors" }) return if options[:raise] == false @@ -139,20 +144,24 @@ ensure init end def perform(action, id, json = nil, options = {}) - @payload << SearchFlip::JSON.generate(action => options.merge(_id: id)) - @payload << "\n" + new_payload = SearchFlip::JSON.generate(action => options.merge(_id: id)) + new_payload << "\n" if json - @payload << json - @payload << "\n" + new_payload << json + new_payload << "\n" end + upload if @num > 0 && @payload.bytesize + new_payload.bytesize >= @bulk_max_bytes + + @payload << new_payload + @num += 1 - upload if @num >= count + upload if @num >= @bulk_limit || @payload.bytesize >= @bulk_max_bytes end end end