Sha256: 066b48b0b0f07d280d4064594aa212f97319751b78c242833163ce0e900d02fe

Contents?: true

Size: 1.57 KB

Versions: 5

Compression:

Stored size: 1.57 KB

Contents

module Lhm
  class ChunkFinder
    LOG_PREFIX = "Chunker"

    def initialize(migration, connection = nil, options = {})
      @migration = migration
      @connection = connection
      @start = options[:start] || select_start_from_db
      @limit = options[:limit] || select_limit_from_db
      @throttler = options[:throttler]
      @processed_rows = 0
    end

    def table_empty?
      start.nil? && limit.nil?
    end

    def validate
      if start > limit
        raise ArgumentError, "impossible chunk options (limit (#{limit.inspect} must be greater than start (#{start.inspect})"
      end
    end

    def each_chunk
      next_id = @start
      @processed_rows = 0
      while next_id <= @limit
        top = upper_id(next_id)
        @processed_rows += @throttler.stride
        yield ChunkInsert.new(@migration, @connection, next_id, top)
        next_id = top + 1
      end
    end

    def max_rows
      @limit - @start + 1
    end

    def processed_rows
      @processed_rows
    end

    private

    attr_reader :start, :limit

    def select_start_from_db
      @connection.select_value("select min(id) from `#{ @migration.origin_name }`")
    end

    def select_limit_from_db
      @connection.select_value("select max(id) from `#{ @migration.origin_name }`")
    end

    def upper_id(next_id)
      sql = "select id from `#{ @migration.origin_name }` where id >= #{ next_id } order by id limit 1 offset #{ @throttler.stride - 1}"
      top = @connection.select_value(sql, should_retry: true, log_prefix: LOG_PREFIX)

      [top ? top.to_i : @limit, @limit].min
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
lhm-teak-3.6.4 lib/lhm/chunk_finder.rb
lhm-teak-3.6.3 lib/lhm/chunk_finder.rb
lhm-teak-3.6.2 lib/lhm/chunk_finder.rb
lhm-teak-3.6.1 lib/lhm/chunk_finder.rb
lhm-teak-3.6.0 lib/lhm/chunk_finder.rb