lib/wyrm/db_pump.rb in wyrm-0.2.0 vs lib/wyrm/db_pump.rb in wyrm-0.2.1

- old
+ new

@@ -1,22 +1,18 @@ require 'sequel' require 'yaml' -require 'ostruct' require 'logger' -require 'fastandand' Sequel.extension :migration -# TODO possibly use Gem::Package::TarWriter to write tar files # TODO when restoring, could use a SizeQueue to make sure the db is kept busy - # TODO need to version the dumps, or something like that. -# TODO This really should be Wyrm::Hole. Or maybe Wyrm::Hole should -# be the codec that connects two DbPumps, for direct transfer? +# TODO looks like io should belong to codec. Hmm. Not sure. +# TODO table_name table_dataset need some thinking about. Dataset would encapsulate both. But couldn't change db then, and primary_keys would be hard. class DbPump # some codecs might ignore io, eg if a dbpump is talking to another dbpump - def initialize( db, table_name, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false ) + def initialize( db: nil, table_name: nil, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false ) self.codec = codec self.db = db self.table_name = table_name self.io = io self.page_size = page_size @@ -40,37 +36,51 @@ @table_name = name_sym end def db=( other_db ) invalidate_cached_members + @db = other_db + return unless other_db + + # add extensions @db.extension :pagination + + # turn on postgres streaming if available + if defined?( Sequel::Postgres ) && Sequel::Postgres.supports_streaming? + logger.info "Turn streaming on for postgres" + @db.extension :pg_streaming + end end # return an object that responds to === # which returns true if ==='s parameter # responds to all the methods - def quacks_like( *methods ) + def self.quacks_like( *methods ) @quacks_like ||= {} @quacks_like[methods] ||= Object.new.tap do |obj| obj.define_singleton_method(:===) do |instance| methods.all?{|m| instance.respond_to? m} end end end + def quacks_like( *methods ) + self.class.quacks_like( *methods ) + end + def codec=( codec_thing ) @codec = case codec_thing when :yaml; YamlCodec.new when :marshal; MarshalCodec.new when Class codec_thing.new when quacks_like( :encode, :decode ) codec_thing else - raise "unknown codec #{codec_thing}" + raise "unknown codec #{codec_thing.inspect}" end end attr_reader :codec @@ -108,99 +118,130 @@ def table_dataset @table_dataset ||= db[table_name.to_sym] end - # TODO possibly use select from outer / inner join to - # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/ - # because mysql is useless - def paginated_dump + # Use limit / offset. Last fallback if there are no keys (or a compound primary key?). + def paginated_dump( &encode_block ) table_dataset.order(*primary_keys).each_page(page_size) do |page| logger.info page.sql - page.each do |row| - unless dry_run? - codec.encode row.values, io - end - end + page.each &encode_block end end - # have to use this for non-integer pks + # Use limit / offset, but not for all fields. # The idea is that large offsets are expensive in the db because the db server has to read - # through the data set to reach the required offset. So make that only ids, and then - # do the main select from the limited id list. + # through the data set to reach the required offset. So make that only ids need to be read, + # and then do the main select from the limited id list. # TODO could speed this up by have a query thread which runs the next page-query while # the current one is being written/compressed. # select * from massive as full # inner join (select id from massive order by whatever limit m, n) limit # on full.id = limit.id # order by full.whatever - def inner_dump + # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/ + def inner_dump( &encode_block ) # could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil) 0.step(table_dataset.count, page_size).each do |offset| limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys ) page = table_dataset.join( limit_dataset, Hash[ primary_keys.map{|f| [f,f]} ] ).order( *primary_keys ).qualify(table_name) logger.info page.sql - page.each do |row| - unless dry_run? - codec.encode row.values, io + page.each &encode_block + end + end + + # Selects pages by a range of ids, using >= and <. + # Use this for integer pks + def min_max_dump( &encode_block ) + # select max(id), min(id) from table + # and then split that up into 10000 size chunks. + # Not really important if there aren't exactly 10000 + min, max = table_dataset.select{[min(id), max(id)]}.first.values + return unless min && max + + # will always include the last item because page_size will be + # bigger than max for the last page + (min..max).step(page_size).each do |offset| + page = table_dataset.where( id: offset...(offset + page_size) ) + logger.info page.sql + page.each &encode_block + end + end + + def stream_dump( &encode_block ) + logger.info "using result set streaming" + + # I want to output progress every page_size records, + # without doing a records_count % page_size every iteration. + # So define an external enumerator + # TODO should really performance test the options here. + records_count = 0 + enum = table_dataset.stream.enum_for + loop do + begin + page_size.times do + encode_block.call enum.next + records_count += 1 end + ensure + logger.info "#{records_count} from #{table_dataset.sql}" end end end + # Dump the serialization of the table to the specified io. # TODO need to also dump a first row containing useful stuff: # - source table name # - number of rows # - source db url # - permissions? # These should all be in one object that can be Marshall.load-ed easily. def dump + _dump do |row| + codec.encode( row.values, io ) unless dry_run? + end + ensure + io.flush + end + + # decide which kind of paged iteration will be best for this table. + # Return an iterator, or yield row hashes to the block + def _dump( &encode_block ) + return enum_for(__method__) unless block_given? case + when table_dataset.respond_to?( :stream ) + stream_dump &encode_block when primary_keys.empty? - paginated_dump + paginated_dump &encode_block when primary_keys.all?{|i| i == :id } - min_max_dump + min_max_dump &encode_block else - inner_dump + inner_dump &encode_block end - io.flush end - # could use this for integer pks - def min_max_dump - # select max(id), min(id) from patents - # and then split that up into 10000 size chunks. Not really important if there aren't exactly 10000 - min, max = table_dataset.select{[min(id), max(id)]}.first.values - return unless min && max - # could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil) - # TODO definitely need to refactor this - - # will always include the last item because - (min..max).step(page_size).each do |offset| - page = table_dataset.where( id: offset...(offset + page_size) ) - logger.info page.sql - page.each do |row| - unless dry_run? - codec.encode row.values, io - end - end - end + def dump_matches_columns?( row_enum, columns ) + raise "schema mismatch" unless row_enum.peek.size == columns.size + true + rescue StopIteration + # peek threw a StopIteration, so there's no data + false end - # TODO lazy evaluation / streaming + # TODO don't generate the full insert, ie leave out the fields + # because we've already checked that the columns and the table + # match. + # TODO generate column names in insert, they might still work + # if columns have been added to the db, but not the dump. # start_row is zero-based def restore( start_row: 0, filename: 'io' ) columns = table_dataset.columns - logger.info{ "inserting to #{table_name} #{columns.inspect}" } - - # get the Enumerator row_enum = each_row - # check that columns match - raise "schema mismatch" if row_enum.peek.size != columns.size + return unless dump_matches_columns?( row_enum, columns ) + logger.info{ "inserting to #{table_name} #{columns.inspect}" } rows_restored = 0 if start_row != 0 logger.info{ "skipping #{start_row} rows from #{filename}" } start_row.times do |i| @@ -215,11 +256,14 @@ loop do db.transaction do begin page_size.times do - # This skips all the checks in the Sequel code + # This skips all the checks in the Sequel code. Basically we want + # to generate the + # insert into (field1,field2) values (value1,value2) + # statement as quickly as possible. sql = table_dataset.clone( columns: columns, values: row_enum.next ).send( :clause_sql, :insert ) db.execute sql unless dry_run? rows_restored += 1 end rescue StopIteration @@ -233,21 +277,17 @@ end logger.info{ "#{table_name} done. Inserted #{rows_restored}." } rows_restored end - # this doesn't really belong here, but it will do for now. - def open_bz2( filename ) - io.andand.close if io != STDOUT && !io.andand.closed? - self.io = IO.popen( "pbzip2 -d -c #{filename}" ) - end - - # enumerate through the given io at its current position + # Enumerate through the given io at its current position + # TODO don't check for io.eof here, leave that to the codec def each_row return enum_for(__method__) unless block_given? yield codec.decode( io ) until io.eof? end + # Enumerate sql insert statements from the dump def insert_sql_each return enum_for(__method__) unless block_given? each_row do |row| yield table_dataset.insert_sql( row ) end