Sha256: 7e2aa208e2853ae444faa34b23763c2608249ad74e6e4f00f05acf32997b281d

Contents?: true

Size: 1.16 KB

Versions: 2

Compression:

Stored size: 1.16 KB

Contents

require "metacrunch/db"

module Metacrunch
  class Db::Reader
    include Metacrunch::ParallelProcessableReader

    def initialize(database_connection_or_url, dataset_proc, options = {})
      @rows_per_fetch = options.delete(:rows_per_fetch) || 1000

      @db = if database_connection_or_url.is_a?(String)
        Sequel.connect(database_connection_or_url, options)
      else
        database_connection_or_url
      end

      @dataset = dataset_proc.call(@db).unlimited
      @total_numbers_of_records = @dataset.count

      unless @dataset.opts[:order]
        raise ArgumentError, "Metacrunch::Db::Reader requires the dataset be ordered."
      end
    end

    def each(&block)
      return enum_for(__method__) unless block_given?

      @db.transaction do
        offset = (-number_of_processes * @rows_per_fetch) + (process_index * @rows_per_fetch)

        loop do
          offset = offset + (number_of_processes * @rows_per_fetch)

          @dataset.limit(@rows_per_fetch).offset(offset).each do |row|
            yield(row)
          end

          break if offset + @rows_per_fetch >= @total_numbers_of_records
        end
      end

      self
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
metacrunch-3.1.1 lib/metacrunch/db/reader.rb
metacrunch-3.1.0 lib/metacrunch/db/reader.rb