lib/lhm/chunker.rb in lhm-2.0.0 vs lib/lhm/chunker.rb in lhm-2.1.0
- old
+ new
@@ -1,10 +1,10 @@
# Copyright (c) 2011 - 2013, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias
# Schmidt
-
require 'lhm/command'
require 'lhm/sql_helper'
+require 'lhm/printer'
module Lhm
class Chunker
include Command
include SqlHelper
@@ -14,59 +14,76 @@
# Copy from origin to destination in chunks of size `stride`. Sleeps for
# `throttle` milliseconds between each stride.
def initialize(migration, connection = nil, options = {})
@migration = migration
@connection = connection
- @stride = options[:stride] || 40_000
- @throttle = options[:throttle] || 100
+ @throttler = options[:throttler]
@start = options[:start] || select_start
@limit = options[:limit] || select_limit
+ @printer = options[:printer] || Printer::Percentage.new
end
- # Copies chunks of size `stride`, starting from `start` up to id `limit`.
- def up_to(&block)
- 1.upto(traversable_chunks_size) do |n|
- yield(bottom(n), top(n))
+ def execute
+ return unless @start && @limit
+ @next_to_insert = @start
+ until @next_to_insert >= @limit
+ stride = @throttler.stride
+ affected_rows = @connection.update(copy(bottom, top(stride)))
+
+ if @throttler && affected_rows > 0
+ @throttler.run
+ end
+
+ @printer.notify(bottom, @limit)
+ @next_to_insert = top(stride) + 1
end
+ @printer.end
end
- def traversable_chunks_size
- @limit && @start ? ((@limit - @start + 1) / @stride.to_f).ceil : 0
- end
+ private
- def bottom(chunk)
- (chunk - 1) * @stride + @start
+ def bottom
+ @next_to_insert
end
- def top(chunk)
- [chunk * @stride + @start - 1, @limit].min
+ def top(stride)
+ [(@next_to_insert + stride - 1), @limit].min
end
def copy(lowest, highest)
"insert ignore into `#{ destination_name }` (#{ columns }) " +
"select #{ select_columns } from `#{ origin_name }` " +
- "#{ conditions } #{ origin_name }.`id` between #{ lowest } and #{ highest }"
+ "#{ conditions } `#{ origin_name }`.`id` between #{ lowest } and #{ highest }"
end
def select_start
- start = connection.select_value("select min(id) from #{ origin_name }")
+ start = connection.select_value("select min(id) from `#{ origin_name }`")
start ? start.to_i : nil
end
def select_limit
- limit = connection.select_value("select max(id) from #{ origin_name }")
+ limit = connection.select_value("select max(id) from `#{ origin_name }`")
limit ? limit.to_i : nil
end
- def throttle_seconds
- @throttle / 1000.0
- end
-
- private
-
+ #XXX this is extremely brittle and doesn't work when filter contains more
+ #than one SQL clause, e.g. "where ... group by foo". Before making any
+ #more changes here, please consider either:
+ #
+ #1. Letting users only specify part of defined clauses (i.e. don't allow
+ #`filter` on Migrator to accept both WHERE and INNER JOIN
+ #2. Changing query building so that it uses structured data rather than
+ #strings until the last possible moment.
def conditions
- @migration.conditions ? "#{@migration.conditions} and" : "where"
+ if @migration.conditions
+ @migration.conditions.
+ sub(/\)\Z/, "").
+ #put any where conditions in parens
+ sub(/where\s(\w.*)\Z/, "where (\\1)") + " and"
+ else
+ "where"
+ end
end
def destination_name
@migration.destination.name
end
@@ -78,28 +95,15 @@
def columns
@columns ||= @migration.intersection.joined
end
def select_columns
- @select_columns ||= @migration.intersection.typed(origin_name)
+ @select_columns ||= @migration.intersection.typed("`#{origin_name}`")
end
def validate
if @start && @limit && @start > @limit
error("impossible chunk options (limit must be greater than start)")
end
- end
-
- def execute
- up_to do |lowest, highest|
- affected_rows = @connection.update(copy(lowest, highest))
-
- if affected_rows > 0
- sleep(throttle_seconds)
- end
-
- print "."
- end
- print "\n"
end
end
end