lib/lhm/chunker.rb in lhm-1.0.0.rc2 vs lib/lhm/chunker.rb in lhm-1.0.0.rc3

- old
+ new

@@ -1,77 +1,96 @@ -# -# Copyright (c) 2011, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias -# Schmidt -# +# Copyright (c) 2011, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias +# Schmidt -require 'lhm/migration' require 'lhm/command' +require 'lhm/sql_helper' module Lhm class Chunker include Command + include SqlHelper - # + attr_reader :connection + # Copy from origin to destination in chunks of size `stride`. Sleeps for # `throttle` milliseconds between each stride. - # - - def initialize(migration, limit = 1, connection = nil, options = {}) + def initialize(migration, connection = nil, options = {}) + @migration = migration + @connection = connection @stride = options[:stride] || 40_000 @throttle = options[:throttle] || 100 - @limit = limit - @connection = connection - @migration = migration + @start = options[:start] || select_start + @limit = options[:limit] || select_limit end - # - # Copies chunks of size `stride`, starting from id 1 up to id `limit`. - # - - def up_to(limit) - traversable_chunks_up_to(limit).times do |n| - yield(bottom(n + 1), top(n + 1, limit)) + # Copies chunks of size `stride`, starting from `start` up to id `limit`. + def up_to(&block) + 1.upto(traversable_chunks_size) do |n| + yield(bottom(n), top(n)) end end - def traversable_chunks_up_to(limit) - (limit / @stride.to_f).ceil + def traversable_chunks_size + @limit && @start ? ((@limit - @start + 1) / @stride.to_f).ceil : 0 end def bottom(chunk) - (chunk - 1) * @stride + 1 + (chunk - 1) * @stride + @start end - def top(chunk, limit) - [chunk * @stride, limit].min + def top(chunk) + [chunk * @stride + @start - 1, @limit].min end def copy(lowest, highest) - "insert ignore into `#{ @migration.destination.name }` (#{ cols.joined }) " + - "select #{ cols.joined } from `#{ @migration.origin.name }` " + + "insert ignore into `#{ destination_name }` (#{ columns }) " + + "select #{ columns } from `#{ origin_name }` " + "where `id` between #{ lowest } and #{ highest }" end + def select_start + start = connection.select_value("select min(id) from #{ origin_name }") + start ? start.to_i : nil + end + + def select_limit + limit = connection.select_value("select max(id) from #{ origin_name }") + limit ? limit.to_i : nil + end + + def throttle_seconds + @throttle / 1000.0 + end + private - def cols - @cols ||= @migration.intersection + def destination_name + @migration.destination.name end + def origin_name + @migration.origin.name + end + + def columns + @columns ||= @migration.intersection.joined + end + + def validate + if @start && @limit && @start > @limit + error("impossible chunk options (limit must be greater than start)") + end + end + def execute - up_to(@limit) do |lowest, highest| + up_to do |lowest, highest| affected_rows = update(copy(lowest, highest)) if affected_rows > 0 sleep(throttle_seconds) end print "." end end - - def throttle_seconds - @throttle / 1000.0 - end end end -