lib/lhm/chunker.rb in lhm-1.0.0.rc2 vs lib/lhm/chunker.rb in lhm-1.0.0.rc3
- old
+ new
@@ -1,77 +1,96 @@
-#
-# Copyright (c) 2011, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias
-# Schmidt
-#
+# Copyright (c) 2011, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias
+# Schmidt
-require 'lhm/migration'
require 'lhm/command'
+require 'lhm/sql_helper'
module Lhm
class Chunker
include Command
+ include SqlHelper
- #
+ attr_reader :connection
+
# Copy from origin to destination in chunks of size `stride`. Sleeps for
# `throttle` milliseconds between each stride.
- #
-
- def initialize(migration, limit = 1, connection = nil, options = {})
+ def initialize(migration, connection = nil, options = {})
+ @migration = migration
+ @connection = connection
@stride = options[:stride] || 40_000
@throttle = options[:throttle] || 100
- @limit = limit
- @connection = connection
- @migration = migration
+ @start = options[:start] || select_start
+ @limit = options[:limit] || select_limit
end
- #
- # Copies chunks of size `stride`, starting from id 1 up to id `limit`.
- #
-
- def up_to(limit)
- traversable_chunks_up_to(limit).times do |n|
- yield(bottom(n + 1), top(n + 1, limit))
+ # Copies chunks of size `stride`, starting from `start` up to id `limit`.
+ def up_to(&block)
+ 1.upto(traversable_chunks_size) do |n|
+ yield(bottom(n), top(n))
end
end
- def traversable_chunks_up_to(limit)
- (limit / @stride.to_f).ceil
+ def traversable_chunks_size
+ @limit && @start ? ((@limit - @start + 1) / @stride.to_f).ceil : 0
end
def bottom(chunk)
- (chunk - 1) * @stride + 1
+ (chunk - 1) * @stride + @start
end
- def top(chunk, limit)
- [chunk * @stride, limit].min
+ def top(chunk)
+ [chunk * @stride + @start - 1, @limit].min
end
def copy(lowest, highest)
- "insert ignore into `#{ @migration.destination.name }` (#{ cols.joined }) " +
- "select #{ cols.joined } from `#{ @migration.origin.name }` " +
+ "insert ignore into `#{ destination_name }` (#{ columns }) " +
+ "select #{ columns } from `#{ origin_name }` " +
"where `id` between #{ lowest } and #{ highest }"
end
+ def select_start
+ start = connection.select_value("select min(id) from #{ origin_name }")
+ start ? start.to_i : nil
+ end
+
+ def select_limit
+ limit = connection.select_value("select max(id) from #{ origin_name }")
+ limit ? limit.to_i : nil
+ end
+
+ def throttle_seconds
+ @throttle / 1000.0
+ end
+
private
- def cols
- @cols ||= @migration.intersection
+ def destination_name
+ @migration.destination.name
end
+ def origin_name
+ @migration.origin.name
+ end
+
+ def columns
+ @columns ||= @migration.intersection.joined
+ end
+
+ def validate
+ if @start && @limit && @start > @limit
+ error("impossible chunk options (limit must be greater than start)")
+ end
+ end
+
def execute
- up_to(@limit) do |lowest, highest|
+ up_to do |lowest, highest|
affected_rows = update(copy(lowest, highest))
if affected_rows > 0
sleep(throttle_seconds)
end
print "."
end
end
-
- def throttle_seconds
- @throttle / 1000.0
- end
end
end
-