lib/wyrm/db_pump.rb in wyrm-0.2.0 vs lib/wyrm/db_pump.rb in wyrm-0.2.1
- old
+ new
@@ -1,22 +1,18 @@
require 'sequel'
require 'yaml'
-require 'ostruct'
require 'logger'
-require 'fastandand'
Sequel.extension :migration
-# TODO possibly use Gem::Package::TarWriter to write tar files
# TODO when restoring, could use a SizeQueue to make sure the db is kept busy
-
# TODO need to version the dumps, or something like that.
-# TODO This really should be Wyrm::Hole. Or maybe Wyrm::Hole should
-# be the codec that connects two DbPumps, for direct transfer?
+# TODO looks like io should belong to codec. Hmm. Not sure.
+# TODO table_name table_dataset need some thinking about. Dataset would encapsulate both. But couldn't change db then, and primary_keys would be hard.
class DbPump
# some codecs might ignore io, eg if a dbpump is talking to another dbpump
- def initialize( db, table_name, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false )
+ def initialize( db: nil, table_name: nil, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false )
self.codec = codec
self.db = db
self.table_name = table_name
self.io = io
self.page_size = page_size
@@ -40,37 +36,51 @@
@table_name = name_sym
end
def db=( other_db )
invalidate_cached_members
+
@db = other_db
+ return unless other_db
+
+ # add extensions
@db.extension :pagination
+
+ # turn on postgres streaming if available
+ if defined?( Sequel::Postgres ) && Sequel::Postgres.supports_streaming?
+ logger.info "Turn streaming on for postgres"
+ @db.extension :pg_streaming
+ end
end
# return an object that responds to ===
# which returns true if ==='s parameter
# responds to all the methods
- def quacks_like( *methods )
+ def self.quacks_like( *methods )
@quacks_like ||= {}
@quacks_like[methods] ||= Object.new.tap do |obj|
obj.define_singleton_method(:===) do |instance|
methods.all?{|m| instance.respond_to? m}
end
end
end
+ def quacks_like( *methods )
+ self.class.quacks_like( *methods )
+ end
+
def codec=( codec_thing )
@codec =
case codec_thing
when :yaml; YamlCodec.new
when :marshal; MarshalCodec.new
when Class
codec_thing.new
when quacks_like( :encode, :decode )
codec_thing
else
- raise "unknown codec #{codec_thing}"
+ raise "unknown codec #{codec_thing.inspect}"
end
end
attr_reader :codec
@@ -108,99 +118,130 @@
def table_dataset
@table_dataset ||= db[table_name.to_sym]
end
- # TODO possibly use select from outer / inner join to
- # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/
- # because mysql is useless
- def paginated_dump
+ # Use limit / offset. Last fallback if there are no keys (or a compound primary key?).
+ def paginated_dump( &encode_block )
table_dataset.order(*primary_keys).each_page(page_size) do |page|
logger.info page.sql
- page.each do |row|
- unless dry_run?
- codec.encode row.values, io
- end
- end
+ page.each &encode_block
end
end
- # have to use this for non-integer pks
+ # Use limit / offset, but not for all fields.
# The idea is that large offsets are expensive in the db because the db server has to read
- # through the data set to reach the required offset. So make that only ids, and then
- # do the main select from the limited id list.
+ # through the data set to reach the required offset. So make that only ids need to be read,
+ # and then do the main select from the limited id list.
# TODO could speed this up by have a query thread which runs the next page-query while
# the current one is being written/compressed.
# select * from massive as full
# inner join (select id from massive order by whatever limit m, n) limit
# on full.id = limit.id
# order by full.whatever
- def inner_dump
+ # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/
+ def inner_dump( &encode_block )
# could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
0.step(table_dataset.count, page_size).each do |offset|
limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys )
page = table_dataset.join( limit_dataset, Hash[ primary_keys.map{|f| [f,f]} ] ).order( *primary_keys ).qualify(table_name)
logger.info page.sql
- page.each do |row|
- unless dry_run?
- codec.encode row.values, io
+ page.each &encode_block
+ end
+ end
+
+ # Selects pages by a range of ids, using >= and <.
+ # Use this for integer pks
+ def min_max_dump( &encode_block )
+ # select max(id), min(id) from table
+ # and then split that up into 10000 size chunks.
+ # Not really important if there aren't exactly 10000
+ min, max = table_dataset.select{[min(id), max(id)]}.first.values
+ return unless min && max
+
+ # will always include the last item because page_size will be
+ # bigger than max for the last page
+ (min..max).step(page_size).each do |offset|
+ page = table_dataset.where( id: offset...(offset + page_size) )
+ logger.info page.sql
+ page.each &encode_block
+ end
+ end
+
+ def stream_dump( &encode_block )
+ logger.info "using result set streaming"
+
+ # I want to output progress every page_size records,
+ # without doing a records_count % page_size every iteration.
+ # So define an external enumerator
+ # TODO should really performance test the options here.
+ records_count = 0
+ enum = table_dataset.stream.enum_for
+ loop do
+ begin
+ page_size.times do
+ encode_block.call enum.next
+ records_count += 1
end
+ ensure
+ logger.info "#{records_count} from #{table_dataset.sql}"
end
end
end
+ # Dump the serialization of the table to the specified io.
# TODO need to also dump a first row containing useful stuff:
# - source table name
# - number of rows
# - source db url
# - permissions?
# These should all be in one object that can be Marshall.load-ed easily.
def dump
+ _dump do |row|
+ codec.encode( row.values, io ) unless dry_run?
+ end
+ ensure
+ io.flush
+ end
+
+ # decide which kind of paged iteration will be best for this table.
+ # Return an iterator, or yield row hashes to the block
+ def _dump( &encode_block )
+ return enum_for(__method__) unless block_given?
case
+ when table_dataset.respond_to?( :stream )
+ stream_dump &encode_block
when primary_keys.empty?
- paginated_dump
+ paginated_dump &encode_block
when primary_keys.all?{|i| i == :id }
- min_max_dump
+ min_max_dump &encode_block
else
- inner_dump
+ inner_dump &encode_block
end
- io.flush
end
- # could use this for integer pks
- def min_max_dump
- # select max(id), min(id) from patents
- # and then split that up into 10000 size chunks. Not really important if there aren't exactly 10000
- min, max = table_dataset.select{[min(id), max(id)]}.first.values
- return unless min && max
- # could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
- # TODO definitely need to refactor this
-
- # will always include the last item because
- (min..max).step(page_size).each do |offset|
- page = table_dataset.where( id: offset...(offset + page_size) )
- logger.info page.sql
- page.each do |row|
- unless dry_run?
- codec.encode row.values, io
- end
- end
- end
+ def dump_matches_columns?( row_enum, columns )
+ raise "schema mismatch" unless row_enum.peek.size == columns.size
+ true
+ rescue StopIteration
+ # peek threw a StopIteration, so there's no data
+ false
end
- # TODO lazy evaluation / streaming
+ # TODO don't generate the full insert, ie leave out the fields
+ # because we've already checked that the columns and the table
+ # match.
+ # TODO generate column names in insert, they might still work
+ # if columns have been added to the db, but not the dump.
# start_row is zero-based
def restore( start_row: 0, filename: 'io' )
columns = table_dataset.columns
- logger.info{ "inserting to #{table_name} #{columns.inspect}" }
-
- # get the Enumerator
row_enum = each_row
- # check that columns match
- raise "schema mismatch" if row_enum.peek.size != columns.size
+ return unless dump_matches_columns?( row_enum, columns )
+ logger.info{ "inserting to #{table_name} #{columns.inspect}" }
rows_restored = 0
if start_row != 0
logger.info{ "skipping #{start_row} rows from #{filename}" }
start_row.times do |i|
@@ -215,11 +256,14 @@
loop do
db.transaction do
begin
page_size.times do
- # This skips all the checks in the Sequel code
+ # This skips all the checks in the Sequel code. Basically we want
+ # to generate the
+ # insert into (field1,field2) values (value1,value2)
+ # statement as quickly as possible.
sql = table_dataset.clone( columns: columns, values: row_enum.next ).send( :clause_sql, :insert )
db.execute sql unless dry_run?
rows_restored += 1
end
rescue StopIteration
@@ -233,21 +277,17 @@
end
logger.info{ "#{table_name} done. Inserted #{rows_restored}." }
rows_restored
end
- # this doesn't really belong here, but it will do for now.
- def open_bz2( filename )
- io.andand.close if io != STDOUT && !io.andand.closed?
- self.io = IO.popen( "pbzip2 -d -c #{filename}" )
- end
-
- # enumerate through the given io at its current position
+ # Enumerate through the given io at its current position
+ # TODO don't check for io.eof here, leave that to the codec
def each_row
return enum_for(__method__) unless block_given?
yield codec.decode( io ) until io.eof?
end
+ # Enumerate sql insert statements from the dump
def insert_sql_each
return enum_for(__method__) unless block_given?
each_row do |row|
yield table_dataset.insert_sql( row )
end