lib/wyrm/db_pump.rb in wyrm-0.1.4 vs lib/wyrm/db_pump.rb in wyrm-0.2.0
- old
+ new
@@ -8,16 +8,10 @@
# TODO possibly use Gem::Package::TarWriter to write tar files
# TODO when restoring, could use a SizeQueue to make sure the db is kept busy
# TODO need to version the dumps, or something like that.
-# So the slowest-changing variables are the db, the io stream
-# and the page size.
-# table will change every call. Will IO stream change between
-# table changes? No. So a currying type approach will work.
-# Somebody must have done this before.
-# But table and io are often related (ie table going to one file)
# TODO This really should be Wyrm::Hole. Or maybe Wyrm::Hole should
# be the codec that connects two DbPumps, for direct transfer?
class DbPump
# some codecs might ignore io, eg if a dbpump is talking to another dbpump
def initialize( db, table_name, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false )
@@ -29,57 +23,59 @@
self.dry_run = dry_run
yield self if block_given?
end
attr_accessor :io, :page_size, :dry_run
+ def dry_run?; dry_run; end
# These affect cached values
attr_reader :db, :table_name
- def table_name=( name_sym )
+ def invalidate_cached_members
@primary_keys = nil
@table_dataset = nil
+ end
+
+ def table_name=( name_sym )
+ invalidate_cached_members
@table_name = name_sym
end
def db=( other_db )
- @primary_keys = nil
- @table_dataset = nil
+ invalidate_cached_members
@db = other_db
@db.extension :pagination
end
- def dry_run?; dry_run; end
-
- class RespondsTo
- def initialize( *methods )
- @methods = methods
+ # return an object that responds to ===
+ # which returns true if ==='s parameter
+ # responds to all the methods
+ def quacks_like( *methods )
+ @quacks_like ||= {}
+ @quacks_like[methods] ||= Object.new.tap do |obj|
+ obj.define_singleton_method(:===) do |instance|
+ methods.all?{|m| instance.respond_to? m}
+ end
end
-
- def ===( instance )
- @methods.all?{|m| instance.respond_to? m}
- end
end
def codec=( codec_thing )
@codec =
case codec_thing
when :yaml; YamlCodec.new
when :marshal; MarshalCodec.new
when Class
- codec.new
- when RespondsTo.new( :encode, :decode )
- codec
+ codec_thing.new
+ when quacks_like( :encode, :decode )
+ codec_thing
else
- raise "unknown codec #{codec}"
+ raise "unknown codec #{codec_thing}"
end
end
attr_reader :codec
- # TODO could use msgpack as serialization here, but its API is unpleasant.
-
class MarshalCodec
def encode( obj, io )
Marshal.dump obj, io
end
@@ -88,22 +84,10 @@
yield obj if block_given?
obj
end
end
- class MsgPackCodec
- def encode( obj, io )
- Marshal.dump obj, io
- end
-
- def decode( io, &block )
- obj = Marshal.load(io)
- yield obj if block_given?
- obj
- end
- end
-
class YamlCodec
def encode( obj, io )
YAML.dump obj, io
end
@@ -201,53 +185,72 @@
end
end
end
end
- # TODO possible memory issues here if the rows are big. May need to fork this.
- # TODO lazy evaluation
- def restore( start_row: 0 )
- logger.info "restoring #{table_name}"
- # destination db should be same structure as incoming data
- column_names = db.schema(table_name.to_sym).map( &:first )
- first = ->(row){raise "schema mismatch" if row.size != column_names.size}
+ # TODO lazy evaluation / streaming
+ # start_row is zero-based
+ def restore( start_row: 0, filename: 'io' )
+ columns = table_dataset.columns
+ logger.info{ "inserting to #{table_name} #{columns.inspect}" }
+ # get the Enumerator
+ row_enum = each_row
+
+ # check that columns match
+ raise "schema mismatch" if row_enum.peek.size != columns.size
+
rows_restored = 0
- # skip this many rows
- start_row.times do
- codec.decode( io ) {|row|}
+ if start_row != 0
+ logger.info{ "skipping #{start_row} rows from #{filename}" }
+ start_row.times do |i|
+ row_enum.next
+ logger.info{ "skipped #{i} from #{filename}" } if i % page_size == 0
+ end
+ logger.info{ "skipped #{start_row} from #{filename}" }
+ rows_restored += start_row
end
- # copy rows into db
- while !io.eof?
- # fetch a page of rows
- rows_ary = []
- begin
- page_size.times do |i|
- codec.decode( io ) do |row|
- rows_ary << row
+ logger.info{ "inserting to #{table_name} from #{rows_restored}" }
+
+ loop do
+ db.transaction do
+ begin
+ page_size.times do
+ # This skips all the checks in the Sequel code
+ sql = table_dataset.clone( columns: columns, values: row_enum.next ).send( :clause_sql, :insert )
+ db.execute sql unless dry_run?
+ rows_restored += 1
end
- rows_restored += 1
+ rescue StopIteration
+ # er reached the end of the inout stream.
+ # So commit this transaction, and then re-raise
+ # StopIteration to get out of the loop{} statement
+ db.after_commit{ raise StopIteration }
end
- rescue EOFError => e
- # ran out of rows, so just use the ones we have so far
+ logger.info{ "#{table_name} inserted #{rows_restored}" }
end
-
- # insert to db. Hopeful db support bulk insert, which Sequel will figure out
- db.transaction do
- table_dataset.import column_names, rows_ary
- yield rows_restored if block_given?
- logger.info "restored #{rows_restored}"
- end
end
-
+ logger.info{ "#{table_name} done. Inserted #{rows_restored}." }
rows_restored
end
- def self.from_bz2( filename, db, table_name, options = {} )
- IO.popen( "pbzip2 -d -c #{filename}" ) do |io|
- dbpump = DbPump.new db, table_name, io: io
- dbpump.restore
+ # this doesn't really belong here, but it will do for now.
+ def open_bz2( filename )
+ io.andand.close if io != STDOUT && !io.andand.closed?
+ self.io = IO.popen( "pbzip2 -d -c #{filename}" )
+ end
+
+ # enumerate through the given io at its current position
+ def each_row
+ return enum_for(__method__) unless block_given?
+ yield codec.decode( io ) until io.eof?
+ end
+
+ def insert_sql_each
+ return enum_for(__method__) unless block_given?
+ each_row do |row|
+ yield table_dataset.insert_sql( row )
end
end
end