lib/metacrunch/db/reader.rb in metacrunch-3.0.3 vs lib/metacrunch/db/reader.rb in metacrunch-3.1.0

- old
+ new

@@ -1,24 +1,42 @@ +require "metacrunch/db" + module Metacrunch class Db::Reader + include Metacrunch::ParallelProcessableReader def initialize(database_connection_or_url, dataset_proc, options = {}) @rows_per_fetch = options.delete(:rows_per_fetch) || 1000 @db = if database_connection_or_url.is_a?(String) Sequel.connect(database_connection_or_url, options) else database_connection_or_url end - @dataset = dataset_proc.call(@db) + @dataset = dataset_proc.call(@db).unlimited + @total_numbers_of_records = @dataset.count + + unless @dataset.opts[:order] + raise ArgumentError, "Metacrunch::Db::Reader requires the dataset be ordered." + end end def each(&block) return enum_for(__method__) unless block_given? - @dataset.paged_each(rows_per_fetch: @rows_per_fetch) do |row| - yield(row) + @db.transaction do + offset = (-number_of_processes * @rows_per_fetch) + (process_index * @rows_per_fetch) + + loop do + offset = offset + (number_of_processes * @rows_per_fetch) + + @dataset.limit(@rows_per_fetch).offset(offset).each do |row| + yield(row) + end + + break if offset + @rows_per_fetch >= @total_numbers_of_records + end end self end