lib/wyrm/db_pump.rb in wyrm-0.1.1 vs lib/wyrm/db_pump.rb in wyrm-0.1.2
- old
+ new
@@ -4,29 +4,66 @@
require 'logger'
require 'fastandand'
Sequel.extension :migration, :schema_dumper, :pagination
-
# TODO possibly use Gem::Package::TarWriter to write tar files
# TODO when restoring, could use a SizeQueue to make sure the db is kept busy
# TODO need to version the dumps, or something like that.
+# So the slowest-changing variables are the db, the io stream
+# and the page size.
+# table will change every call. Will IO stream change between
+# table changes? No. So a currying type approach will work.
+# Somebody must have done this before.
+# But table and io are often related (ie table going to one file)
+# TODO This really should be Wyrm::Hole. Or maybe Wyrm::Hole should
+# be the codec that connects two DbPumps, for direct transfer?
class DbPump
+ # some codecs might ignore io, eg if a dbpump is talking to another dbpump
+ def initialize( db, table_name, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false )
+ self.codec = codec
+ self.db = db
+ self.table_name = table_name
+ self.io = io
+ self.page_size = page_size
+ self.dry_run = dry_run
+ yield self if block_given?
+ end
+
+ attr_accessor :io, :page_size, :dry_run
+
+ # These affect cached values
+ attr_reader :db, :table_name
+
+ def table_name=( name_sym )
+ @primary_keys = nil
+ @table_dataset = nil
+ @table_name = name_sym
+ end
+
+ def db=( other_db )
+ @primary_keys = nil
+ @table_dataset = nil
+ @db = other_db
+ end
+
+ def dry_run?; dry_run; end
+
class RespondsTo
def initialize( *methods )
@methods = methods
end
def ===( instance )
@methods.all?{|m| instance.respond_to? m}
end
end
- def initialize( codec = :marshal )
+ def codec=( codec_thing )
@codec =
- case codec
+ case codec_thing
when :yaml; YamlCodec.new
when :marshal; MarshalCodec.new
when Class
codec.new
when RespondsTo.new( :encode, :decode )
@@ -78,29 +115,30 @@
def logger
@logger ||= Logger.new STDERR
end
- def primary_keys( db, table_name )
- db.schema(table_name).select{|df| df.last[:primary_key]}.map{|df| df.first}
+ def primary_keys
+ @primary_keys ||= db.schema(table_name).select{|df| df.last[:primary_key]}.map{|df| df.first}
end
+ def table_dataset
+ @table_dataset ||= db[table_name.to_sym]
+ end
+
# TODO possibly use select from outer / inner join to
# http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/
# because mysql is useless
- def paginated_dump( table_name, options = {} )
- options = OpenStruct.new( {io: STDOUT, page_size: 10000, dry_run: false}.merge( options.to_h ) )
- pk = primary_keys options.db, table_name
- options.db[table_name].order(*pk).each_page(options[:page_size]) do |page|
+ def paginated_dump
+ table_dataset.order(*primary_keys).each_page(page_size) do |page|
logger.info page.sql
page.each do |row|
- unless options[:dry_run]
- codec.encode row.values, options.io
+ unless dry_run?
+ codec.encode row.values, io
end
end
end
- options.io.flush
end
# have to use this for non-integer pks
# The idea is that large offsets are expensive in the db because the db server has to read
# through the data set to reach the required offset. So make that only ids, and then
@@ -109,302 +147,106 @@
# the current one is being written/compressed.
# select * from massive as full
# inner join (select id from massive order by whatever limit m, n) limit
# on full.id = limit.id
# order by full.whatever
- def inner_dump( table_name, options = {} )
- options = OpenStruct.new( {io: STDOUT, page_size: 10000, dry_run: false}.merge( options.to_h ) )
- pk = primary_keys options.db, table_name
-
- table_dataset = options.db[table_name]
+ def inner_dump
# could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
- 0.step(table_dataset.count, options.page_size).each do |offset|
- limit_dataset = table_dataset.select( *pk ).limit( options.page_size, offset ).order( *pk )
- page = table_dataset.join( limit_dataset, Hash[ pk.map{|f| [f,f]} ] ).order( *pk ).qualify_to(table_name)
+ 0.step(table_dataset.count, page_size).each do |offset|
+ limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys )
+ page = table_dataset.join( limit_dataset, Hash[ primary_keys.map{|f| [f,f]} ] ).order( *primary_keys ).qualify_to(table_name)
logger.info page.sql
page.each do |row|
- unless options[:dry_run]
- codec.encode row.values, options.io
+ unless dry_run?
+ codec.encode row.values, io
end
end
end
- options.io.flush
end
# TODO need to also dump a first row containing useful stuff:
# - source table name
# - number of rows
# - source db url
# - permissions?
# These should all be in one object that can be Marshall.load-ed easily.
- def dump( table_name, options = {} )
- pk = primary_keys options[:db], table_name
+ def dump
case
- when pk.empty?
- paginated_dump( table_name, options )
- when pk.all?{|i| i == :id }
- min_max_dump( table_name, options )
+ when primary_keys.empty?
+ paginated_dump
+ when primary_keys.all?{|i| i == :id }
+ min_max_dump
else
- inner_dump( table_name, options )
+ inner_dump
end
+ io.flush
end
# could use this for integer pks
- def min_max_dump( table_name, options = {} )
+ def min_max_dump
# select max(id), min(id) from patents
# and then split that up into 10000 size chunks. Not really important if there aren't exactly 10000
- options = OpenStruct.new( {io: STDOUT, page_size: 10000, dry_run: false}.merge( options.to_h ) )
- pk = primary_keys options.db, table_name
-
- table_dataset = options.db[table_name]
min, max = table_dataset.select{[min(id), max(id)]}.first.values
return unless min && max
# could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
# TODO definitely need to refactor this
# will always include the last item because
- (min..max).step(options.page_size).each do |offset|
- page = table_dataset.where( id: offset...(offset+options.page_size) )
+ (min..max).step(page_size).each do |offset|
+ page = table_dataset.where( id: offset...(offset + page_size) )
logger.info page.sql
page.each do |row|
- unless options[:dry_run]
- codec.encode row.values, options.io
+ unless dry_run?
+ codec.encode row.values, io
end
end
end
- options.io.flush
end
# TODO possible memory issues here if the rows are big. May need to fork this.
# TODO lazy evaluation
- def restore( table_name, options = {} )
+ def restore( start_row: 0 )
logger.info "restoring #{table_name}"
- options = OpenStruct.new( {io: STDIN, page_size: 10000, start_row: 0, dry_run: false}.merge( options ) )
- dataset = options.db[table_name.to_sym]
# destination db should be same structure as incoming data
- column_names = options.db.schema(table_name.to_sym).map( &:first )
+ column_names = db.schema(table_name.to_sym).map( &:first )
first = ->(row){raise "schema mismatch" if row.size != column_names.size}
rows_restored = 0
# skip this many rows
- options.start_row.times do
- codec.decode( options.io ) {|row|}
+ start_row.times do
+ codec.decode( io ) {|row|}
end
# copy rows into db
- while !options.io.eof?
+ while !io.eof?
# fetch a page of rows
rows_ary = []
begin
- options.page_size.times do |i|
- codec.decode( options.io ) do |row|
+ page_size.times do |i|
+ codec.decode( io ) do |row|
rows_ary << row
end
rows_restored += 1
end
rescue EOFError => e
# ran out of rows, so just use the ones we have so far
end
# insert to db. Hopeful db support bulk insert, which Sequel will figure out
- options.db.transaction do
- dataset.import column_names, rows_ary
+ db.transaction do
+ table_dataset.import column_names, rows_ary
yield rows_restored if block_given?
logger.info "restored #{rows_restored}"
end
end
rows_restored
end
- def from_bz2( filename, db, table_name, options = {} )
+ def self.from_bz2( filename, db, table_name, options = {} )
IO.popen( "pbzip2 -d -c #{filename}" ) do |io|
- restore table_name, options.merge( io: io, db: db )
+ dbpump = DbPump.new db, table_name, io: io
+ dbpump.restore
end
- end
-end
-
-# There are actually 2 sources for this:
-# one is the src db, the other is the dumped files
-# And the one that transfers live is another version
-class Schema
- def initialize( src_db, dst_db = nil )
- @src_db = src_db
- @dst_db = dst_db
- end
-
- def schema_migration
- @schema_migration ||= src_db.dump_schema_migration(:indexes=>false, :same_db => same_db)
- end
-
- def index_migration
- @index_migration ||= src_db.dump_indexes_migration(:same_db => same_db)
- end
-
- def fk_migration
- @fk_migration ||= src_db.dump_foreign_key_migration(:same_db => same_db)
- end
-
- def restore_migration
- <<-EOF
- require 'restore_migration'
- Sequel.migration do
- def db_pump
- end
-
- up do
- restore_tables
- end
-
- down do
- # from each table clear table
- each_table do |table_name|
- db_pump.restore table_name, io: io, db: db
- end
- end
- end
- EOF
- end
-
- attr_accessor :dst_db
- attr_reader :src_db
-
- def same_db
- @dst_db.andand.database_type == @src_db.andand.database_type
- end
-
- def logger
- @logger ||= Logger.new STDERR
- end
-
- # create the destination schema
- def create
- eval( @schema_migration ).apply dst_db, :up
- end
-
- # create indexes and foreign keys, and reset sequences
- def index
- logger.info "creating indexes"
- eval(@index_migration).apply dst, :up
- logger.info "creating foreign keys"
- eval(@fk_migration).apply dst, :up
-
- if dst.database_type == :postgres
- logger.info "reset primary key sequences"
- dst.tables.each{|t| dst.reset_primary_key_sequence(t)}
- logger.info "Primary key sequences reset successfully"
- end
- end
-
- def transfer_table( table_name, options = {} )
- options = OpenStruct.new( {page_size: 10000, dry_run: false}.merge( options ) )
- total_records = @src_db[table_name].count
- logger.info "transferring #{total_records}"
- column_names = @src_db.schema(table_name.to_sym).map( &:first )
-
- @src_db[table_name].each_page(options.page_size) do |page|
- logger.info "#{page.sql} of #{total_records}"
- unless options.dry_run
- @dst_db.transaction do
- rows_ary = []
- page.each do |row_hash|
- rows_ary << row_hash.values
- end
- @dst_db[table_name.to_sym].import column_names, rows_ary
- end
- end
- end
- end
-
- # copy the data in the tables
- def transfer
- create
- transfer_tables
- index
- end
-
- def dump_schema( container, options = {codec: :marshal} )
- (container + '001_schema.rb').open('w') do |io|
- io.write schema_migration
- end
-
- (container + '002_populate_tables.rb').open('w') do |io|
- io.write restore_migration
- end
-
- (container + '003_indexes.rb').open('w') do |io|
- io.write index_migration
- end
-
- (container + '004_foreign keys.rb').open('w') do |io|
- io.write fk_migration
- end
- end
-
- def load_migrations( container )
- @schema_migration = eval (container + '001_schema.rb').read
- @index_migration = eval (container + '003_indexes.rb').read
- @fk_migration = eval (container + '004_foreign keys.rb').read
- end
-
- def dump_one_table( table_name, pathname, db_pump )
- logger.info "dumping #{table_name} to #{pathname}"
- fio = pathname.open('w')
- # open subprocess in read-write mode
- zio = IO.popen( "pbzip2 -z", 'r+' )
- copier = Thread.new do
- begin
- IO.copy_stream zio, fio
- logger.debug "finished stream copy"
- ensure
- fio.close
- end
- end
-
- # generate the dump
- db_pump.dump table_name, db: src_db, io: zio
-
- # signal the copier thread to stop
- zio.close_write
- logger.debug 'finished dumping'
- # wait for copier thread to
- copier.join
- logger.debug 'stream copy thread finished'
- ensure
- zio.close unless zio.closed?
- fio.close unless fio.closed?
- end
-
- def dump_tables( container, options = {:codec => :marshal} )
- container = Pathname(container)
- db_pump = DbPump.new( options[:codec] )
-
- src_db.tables.each do |table_name|
- filename = container + "#{table_name}.dbp.bz2"
- dump_one_table table_name, filename, db_pump
- end
- end
-
- def restore_one_table( table_file, db_pump )
- logger.info "restoring from #{table_file}"
- table_name = table_file.basename.sub_ext('').sub_ext('').to_s.to_sym
- # check if table has been restored already, and has the correct rows,
- # otherwise pass in a start row.
- db_pump.from_bz2 table_file, dst_db, table_name
- end
-
- def restore_tables( container, options = {:codec => :marshal} )
- db_pump = DbPump.new( options[:codec] )
- table_files = Pathname.glob Pathname(container) + '*dbp.bz2'
- table_files.each{|table_file| restore_one_table table_file, db_pump}
- end
-
- def restore_tables( container, options = {:codec => :marshal} )
- container = Pathname(container)
- container.child ren
- end
-
- def self.transfer( src_db, dst_db )
- new( src_db, dst_db ).transfer
end
end