lib/wyrm/db_pump.rb in wyrm-0.1.4 vs lib/wyrm/db_pump.rb in wyrm-0.2.0

- old
+ new

@@ -8,16 +8,10 @@ # TODO possibly use Gem::Package::TarWriter to write tar files # TODO when restoring, could use a SizeQueue to make sure the db is kept busy # TODO need to version the dumps, or something like that. -# So the slowest-changing variables are the db, the io stream -# and the page size. -# table will change every call. Will IO stream change between -# table changes? No. So a currying type approach will work. -# Somebody must have done this before. -# But table and io are often related (ie table going to one file) # TODO This really should be Wyrm::Hole. Or maybe Wyrm::Hole should # be the codec that connects two DbPumps, for direct transfer? class DbPump # some codecs might ignore io, eg if a dbpump is talking to another dbpump def initialize( db, table_name, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false ) @@ -29,57 +23,59 @@ self.dry_run = dry_run yield self if block_given? end attr_accessor :io, :page_size, :dry_run + def dry_run?; dry_run; end # These affect cached values attr_reader :db, :table_name - def table_name=( name_sym ) + def invalidate_cached_members @primary_keys = nil @table_dataset = nil + end + + def table_name=( name_sym ) + invalidate_cached_members @table_name = name_sym end def db=( other_db ) - @primary_keys = nil - @table_dataset = nil + invalidate_cached_members @db = other_db @db.extension :pagination end - def dry_run?; dry_run; end - - class RespondsTo - def initialize( *methods ) - @methods = methods + # return an object that responds to === + # which returns true if ==='s parameter + # responds to all the methods + def quacks_like( *methods ) + @quacks_like ||= {} + @quacks_like[methods] ||= Object.new.tap do |obj| + obj.define_singleton_method(:===) do |instance| + methods.all?{|m| instance.respond_to? m} + end end - - def ===( instance ) - @methods.all?{|m| instance.respond_to? m} - end end def codec=( codec_thing ) @codec = case codec_thing when :yaml; YamlCodec.new when :marshal; MarshalCodec.new when Class - codec.new - when RespondsTo.new( :encode, :decode ) - codec + codec_thing.new + when quacks_like( :encode, :decode ) + codec_thing else - raise "unknown codec #{codec}" + raise "unknown codec #{codec_thing}" end end attr_reader :codec - # TODO could use msgpack as serialization here, but its API is unpleasant. - class MarshalCodec def encode( obj, io ) Marshal.dump obj, io end @@ -88,22 +84,10 @@ yield obj if block_given? obj end end - class MsgPackCodec - def encode( obj, io ) - Marshal.dump obj, io - end - - def decode( io, &block ) - obj = Marshal.load(io) - yield obj if block_given? - obj - end - end - class YamlCodec def encode( obj, io ) YAML.dump obj, io end @@ -201,53 +185,72 @@ end end end end - # TODO possible memory issues here if the rows are big. May need to fork this. - # TODO lazy evaluation - def restore( start_row: 0 ) - logger.info "restoring #{table_name}" - # destination db should be same structure as incoming data - column_names = db.schema(table_name.to_sym).map( &:first ) - first = ->(row){raise "schema mismatch" if row.size != column_names.size} + # TODO lazy evaluation / streaming + # start_row is zero-based + def restore( start_row: 0, filename: 'io' ) + columns = table_dataset.columns + logger.info{ "inserting to #{table_name} #{columns.inspect}" } + # get the Enumerator + row_enum = each_row + + # check that columns match + raise "schema mismatch" if row_enum.peek.size != columns.size + rows_restored = 0 - # skip this many rows - start_row.times do - codec.decode( io ) {|row|} + if start_row != 0 + logger.info{ "skipping #{start_row} rows from #{filename}" } + start_row.times do |i| + row_enum.next + logger.info{ "skipped #{i} from #{filename}" } if i % page_size == 0 + end + logger.info{ "skipped #{start_row} from #{filename}" } + rows_restored += start_row end - # copy rows into db - while !io.eof? - # fetch a page of rows - rows_ary = [] - begin - page_size.times do |i| - codec.decode( io ) do |row| - rows_ary << row + logger.info{ "inserting to #{table_name} from #{rows_restored}" } + + loop do + db.transaction do + begin + page_size.times do + # This skips all the checks in the Sequel code + sql = table_dataset.clone( columns: columns, values: row_enum.next ).send( :clause_sql, :insert ) + db.execute sql unless dry_run? + rows_restored += 1 end - rows_restored += 1 + rescue StopIteration + # er reached the end of the inout stream. + # So commit this transaction, and then re-raise + # StopIteration to get out of the loop{} statement + db.after_commit{ raise StopIteration } end - rescue EOFError => e - # ran out of rows, so just use the ones we have so far + logger.info{ "#{table_name} inserted #{rows_restored}" } end - - # insert to db. Hopeful db support bulk insert, which Sequel will figure out - db.transaction do - table_dataset.import column_names, rows_ary - yield rows_restored if block_given? - logger.info "restored #{rows_restored}" - end end - + logger.info{ "#{table_name} done. Inserted #{rows_restored}." } rows_restored end - def self.from_bz2( filename, db, table_name, options = {} ) - IO.popen( "pbzip2 -d -c #{filename}" ) do |io| - dbpump = DbPump.new db, table_name, io: io - dbpump.restore + # this doesn't really belong here, but it will do for now. + def open_bz2( filename ) + io.andand.close if io != STDOUT && !io.andand.closed? + self.io = IO.popen( "pbzip2 -d -c #{filename}" ) + end + + # enumerate through the given io at its current position + def each_row + return enum_for(__method__) unless block_given? + yield codec.decode( io ) until io.eof? + end + + def insert_sql_each + return enum_for(__method__) unless block_given? + each_row do |row| + yield table_dataset.insert_sql( row ) end end end