lib/metacrunch/db/reader.rb in metacrunch-3.0.3 vs lib/metacrunch/db/reader.rb in metacrunch-3.1.0
- old
+ new
@@ -1,24 +1,42 @@
+require "metacrunch/db"
+
module Metacrunch
class Db::Reader
+ include Metacrunch::ParallelProcessableReader
def initialize(database_connection_or_url, dataset_proc, options = {})
@rows_per_fetch = options.delete(:rows_per_fetch) || 1000
@db = if database_connection_or_url.is_a?(String)
Sequel.connect(database_connection_or_url, options)
else
database_connection_or_url
end
- @dataset = dataset_proc.call(@db)
+ @dataset = dataset_proc.call(@db).unlimited
+ @total_numbers_of_records = @dataset.count
+
+ unless @dataset.opts[:order]
+ raise ArgumentError, "Metacrunch::Db::Reader requires the dataset be ordered."
+ end
end
def each(&block)
return enum_for(__method__) unless block_given?
- @dataset.paged_each(rows_per_fetch: @rows_per_fetch) do |row|
- yield(row)
+ @db.transaction do
+ offset = (-number_of_processes * @rows_per_fetch) + (process_index * @rows_per_fetch)
+
+ loop do
+ offset = offset + (number_of_processes * @rows_per_fetch)
+
+ @dataset.limit(@rows_per_fetch).offset(offset).each do |row|
+ yield(row)
+ end
+
+ break if offset + @rows_per_fetch >= @total_numbers_of_records
+ end
end
self
end