lib/lhm/chunker.rb in lhm-2.0.0 vs lib/lhm/chunker.rb in lhm-2.1.0

- old
+ new

@@ -1,10 +1,10 @@ # Copyright (c) 2011 - 2013, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias # Schmidt - require 'lhm/command' require 'lhm/sql_helper' +require 'lhm/printer' module Lhm class Chunker include Command include SqlHelper @@ -14,59 +14,76 @@ # Copy from origin to destination in chunks of size `stride`. Sleeps for # `throttle` milliseconds between each stride. def initialize(migration, connection = nil, options = {}) @migration = migration @connection = connection - @stride = options[:stride] || 40_000 - @throttle = options[:throttle] || 100 + @throttler = options[:throttler] @start = options[:start] || select_start @limit = options[:limit] || select_limit + @printer = options[:printer] || Printer::Percentage.new end - # Copies chunks of size `stride`, starting from `start` up to id `limit`. - def up_to(&block) - 1.upto(traversable_chunks_size) do |n| - yield(bottom(n), top(n)) + def execute + return unless @start && @limit + @next_to_insert = @start + until @next_to_insert >= @limit + stride = @throttler.stride + affected_rows = @connection.update(copy(bottom, top(stride))) + + if @throttler && affected_rows > 0 + @throttler.run + end + + @printer.notify(bottom, @limit) + @next_to_insert = top(stride) + 1 end + @printer.end end - def traversable_chunks_size - @limit && @start ? ((@limit - @start + 1) / @stride.to_f).ceil : 0 - end + private - def bottom(chunk) - (chunk - 1) * @stride + @start + def bottom + @next_to_insert end - def top(chunk) - [chunk * @stride + @start - 1, @limit].min + def top(stride) + [(@next_to_insert + stride - 1), @limit].min end def copy(lowest, highest) "insert ignore into `#{ destination_name }` (#{ columns }) " + "select #{ select_columns } from `#{ origin_name }` " + - "#{ conditions } #{ origin_name }.`id` between #{ lowest } and #{ highest }" + "#{ conditions } `#{ origin_name }`.`id` between #{ lowest } and #{ highest }" end def select_start - start = connection.select_value("select min(id) from #{ origin_name }") + start = connection.select_value("select min(id) from `#{ origin_name }`") start ? start.to_i : nil end def select_limit - limit = connection.select_value("select max(id) from #{ origin_name }") + limit = connection.select_value("select max(id) from `#{ origin_name }`") limit ? limit.to_i : nil end - def throttle_seconds - @throttle / 1000.0 - end - - private - + #XXX this is extremely brittle and doesn't work when filter contains more + #than one SQL clause, e.g. "where ... group by foo". Before making any + #more changes here, please consider either: + # + #1. Letting users only specify part of defined clauses (i.e. don't allow + #`filter` on Migrator to accept both WHERE and INNER JOIN + #2. Changing query building so that it uses structured data rather than + #strings until the last possible moment. def conditions - @migration.conditions ? "#{@migration.conditions} and" : "where" + if @migration.conditions + @migration.conditions. + sub(/\)\Z/, ""). + #put any where conditions in parens + sub(/where\s(\w.*)\Z/, "where (\\1)") + " and" + else + "where" + end end def destination_name @migration.destination.name end @@ -78,28 +95,15 @@ def columns @columns ||= @migration.intersection.joined end def select_columns - @select_columns ||= @migration.intersection.typed(origin_name) + @select_columns ||= @migration.intersection.typed("`#{origin_name}`") end def validate if @start && @limit && @start > @limit error("impossible chunk options (limit must be greater than start)") end - end - - def execute - up_to do |lowest, highest| - affected_rows = @connection.update(copy(lowest, highest)) - - if affected_rows > 0 - sleep(throttle_seconds) - end - - print "." - end - print "\n" end end end