lib/wyrm/pump.rb in wyrm-0.4.1 vs lib/wyrm/pump.rb in wyrm-0.4.2
- old
+ new
@@ -6,306 +6,308 @@
# TODO when restoring, could use a SizeQueue to make sure the db is kept busy
# TODO need to version the dumps, or something like that.
# TODO looks like io should belong to codec. Hmm. Not sure.
# TODO table_name table_dataset need some thinking about. Dataset would encapsulate both. But couldn't change db then, and primary_keys would be hard.
-class Wyrm::Pump
- def initialize( db: nil, table_name: nil, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false, logger: nil )
- self.codec = codec
- self.db = db
- self.table_name = table_name
- self.io = io
- self.page_size = page_size
- self.dry_run = dry_run
- self.logger = logger
- yield self if block_given?
- end
+module Wyrm
+ class Pump
+ def initialize( db: nil, table_name: nil, io: STDOUT, codec: :marshal, page_size: 10000, dry_run: false, logger: nil )
+ self.codec = codec
+ self.db = db
+ self.table_name = table_name
+ self.io = io
+ self.page_size = page_size
+ self.dry_run = dry_run
+ self.logger = logger
+ yield self if block_given?
+ end
- include Wyrm::Logger
- attr_writer :logger
+ include Wyrm::Logger
+ attr_writer :logger
- attr_accessor :io, :page_size, :dry_run
- def dry_run?; dry_run; end
+ attr_accessor :io, :page_size, :dry_run
+ def dry_run?; dry_run; end
- # These are affected by cached values
- attr_reader :db, :table_name
+ # These are affected by cached values
+ attr_reader :db, :table_name
- def invalidate_cached_members
- @primary_keys = nil
- @table_dataset = nil
- end
+ def invalidate_cached_members
+ @primary_keys = nil
+ @table_dataset = nil
+ end
- def table_name=( name_sym )
- invalidate_cached_members
- @table_name = name_sym
- end
+ def table_name=( name_sym )
+ invalidate_cached_members
+ @table_name = name_sym
+ end
- def db=( other_db )
- invalidate_cached_members
+ def db=( other_db )
+ invalidate_cached_members
- @db = other_db
- return unless other_db
+ @db = other_db
+ return unless other_db
- # add extensions
- @db.extension :pagination
+ # add extensions
+ @db.extension :pagination
- # turn on postgres streaming if available
- # also gets called for non-postgres dbs, but that seems to be fine.
- if defined?( Sequel::Postgres ) && @db.is_a?(Sequel::Postgres::Database) && defined?(Sequel::Postgres.supports_streaming?) && Sequel::Postgres.supports_streaming?
- @db.extension :pg_streaming
- logger.info "Streaming for #{@db.uri}"
- else
- logger.info "No streaming for #{@db.uri}"
+ # turn on postgres streaming if available
+ # also gets called for non-postgres dbs, but that seems to be fine.
+ if defined?( Sequel::Postgres::Database ) && @db.is_a?(Sequel::Postgres::Database) && defined?(Sequel::Postgres.supports_streaming?) && Sequel::Postgres.supports_streaming?
+ @db.extension :pg_streaming
+ logger.info "Streaming for #{@db.uri}"
+ else
+ logger.info "No streaming for #{@db.uri}"
+ end
end
- end
- # return an object that responds to ===
- # which returns true if ==='s parameter
- # responds to all the methods
- def self.quacks_like( *methods )
- @quacks_like ||= {}
- @quacks_like[methods] ||= lambda do |inst|
- methods.all?{|m| inst.respond_to? m}
+ # return an object that responds to ===
+ # which returns true if ==='s parameter
+ # responds to all the methods
+ def self.quacks_like( *methods )
+ @quacks_like ||= {}
+ @quacks_like[methods] ||= lambda do |inst|
+ methods.all?{|m| inst.respond_to? m}
+ end
end
- end
- def quacks_like( *methods )
- self.class.quacks_like( *methods )
- end
+ def quacks_like( *methods )
+ self.class.quacks_like( *methods )
+ end
- def codec=( codec_thing )
- @codec =
- case codec_thing
- when :yaml; YamlCodec.new
- when :marshal; MarshalCodec.new
- when Class
- codec_thing.new
- when quacks_like(:encode,:decode)
- codec_thing
- else
- raise "unknown codec #{codec_thing.inspect}"
+ def codec=( codec_thing )
+ @codec =
+ case codec_thing
+ when :yaml; YamlCodec.new
+ when :marshal; MarshalCodec.new
+ when Class
+ codec_thing.new
+ when quacks_like(:encode,:decode)
+ codec_thing
+ else
+ raise "unknown codec #{codec_thing.inspect}"
+ end
end
- end
- attr_reader :codec
+ attr_reader :codec
- class MarshalCodec
- def encode( obj, io )
- Marshal.dump obj, io
+ class MarshalCodec
+ def encode( obj, io )
+ Marshal.dump obj, io
+ end
+
+ def decode( io, &block )
+ obj = Marshal.load(io)
+ yield obj if block_given?
+ obj
+ end
end
- def decode( io, &block )
- obj = Marshal.load(io)
- yield obj if block_given?
- obj
+ class YamlCodec
+ def encode( obj, io )
+ YAML.dump obj, io
+ end
+
+ def decode( io, &block )
+ obj = YAML.load(io)
+ yield obj if block_given?
+ obj
+ end
end
- end
- class YamlCodec
- def encode( obj, io )
- YAML.dump obj, io
+ def primary_keys
+ # each_with_object([]){...} is only faster for < 3 items in 100000
+ @primary_keys ||= db.schema(table_name).map{|name,column_info| name if column_info[:primary_key]}.compact
end
- def decode( io, &block )
- obj = YAML.load(io)
- yield obj if block_given?
- obj
+ def table_dataset
+ @table_dataset ||= db[table_name.to_sym]
end
- end
- def primary_keys
- # each_with_object([]){...} is only faster for < 3 items in 100000
- @primary_keys ||= db.schema(table_name).map{|name,column_info| name if column_info[:primary_key]}.compact
- end
-
- def table_dataset
- @table_dataset ||= db[table_name.to_sym]
- end
-
- # Use limit / offset. Last fallback if there are no keys (or a compound primary key?).
- def paginated_dump( &encode_block )
- records_count = 0
- table_dataset.order(*primary_keys).each_page(page_size) do |page|
- logger.info "#{__method__} #{table_name} #{records_count}"
- logger.debug page.sql
- page.each &encode_block
- records_count += page_size
+ # Use limit / offset. Last fallback if there are no keys (or a compound primary key?).
+ def paginated_dump( &encode_block )
+ records_count = 0
+ table_dataset.order(*primary_keys).each_page(page_size) do |page|
+ logger.info "#{__method__} #{table_name} #{records_count}"
+ logger.debug page.sql
+ page.each &encode_block
+ records_count += page_size
+ end
end
- end
- # Use limit / offset, but not for all fields.
- # The idea is that large offsets are expensive in the db because the db server has to read
- # through the data set to reach the required offset. So make that only ids need to be read,
- # and then do the main select from the limited id list.
- # select * from massive as full
- # inner join (select id from massive order by whatever limit m, n) limit
- # on full.id = limit.id
- # order by full.whatever
- # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/
- def inner_dump( &encode_block )
- # could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
- on_conditions = primary_keys.map{|f| [f,f]}.to_h
- (0..table_dataset.count).step(page_size).each do |offset|
- limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys )
- page = table_dataset.join( limit_dataset, on_conditions ).order( *primary_keys ).qualify(table_name)
- logger.info "#{__method__} #{table_name} #{offset}"
- logger.debug page.sql
- page.each &encode_block
+ # Use limit / offset, but not for all fields.
+ # The idea is that large offsets are expensive in the db because the db server has to read
+ # through the data set to reach the required offset. So make that only ids need to be read,
+ # and then do the main select from the limited id list.
+ # select * from massive as full
+ # inner join (select id from massive order by whatever limit m, n) limit
+ # on full.id = limit.id
+ # order by full.whatever
+ # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/
+ def inner_dump( &encode_block )
+ # could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
+ on_conditions = primary_keys.map{|f| [f,f]}.to_h
+ (0..table_dataset.count).step(page_size).each do |offset|
+ limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys )
+ page = table_dataset.join( limit_dataset, on_conditions ).order( *primary_keys ).qualify(table_name)
+ logger.info "#{__method__} #{table_name} #{offset}"
+ logger.debug page.sql
+ page.each &encode_block
+ end
end
- end
- # Selects pages by a range of ids, using >= and <.
- # Use this for integer pks
- def min_max_dump( &encode_block )
- # select max(id), min(id) from table
- # and then split that up into 10000 size chunks.
- # Not really important if there aren't exactly 10000
- min, max = table_dataset.select{[min(id), max(id)]}.first.values
- return unless min && max
+ # Selects pages by a range of ids, using >= and <.
+ # Use this for integer pks
+ def min_max_dump( &encode_block )
+ # select max(id), min(id) from table
+ # and then split that up into 10000 size chunks.
+ # Not really important if there aren't exactly 10000
+ min, max = table_dataset.select{[min(id), max(id)]}.first.values
+ return unless min && max
- # will always include the last item because page_size will be
- # bigger than max for the last page
- (min..max).step(page_size).each do |offset|
- page = table_dataset.where( id: offset...(offset + page_size) )
- logger.info "#{__method__} #{table_name} #{offset}"
- logger.debug page.sql
- page.each &encode_block
+ # will always include the last item because page_size will be
+ # bigger than max for the last page
+ (min..max).step(page_size).each do |offset|
+ page = table_dataset.where( id: offset...(offset + page_size) )
+ logger.info "#{__method__} #{table_name} #{offset}"
+ logger.debug page.sql
+ page.each &encode_block
+ end
end
- end
- def stream_dump( &encode_block )
- logger.info "using result set streaming"
+ def stream_dump( &encode_block )
+ logger.info "using result set streaming"
- # I want to output progress every page_size records,
- # without doing a records_count % page_size every iteration.
- # So define an external enumerator
- # TODO should really performance test the options here.
- records_count = 0
- enum = table_dataset.stream.enum_for
- loop do
- begin
- page_size.times do
- encode_block.call enum.next
- records_count += 1
+ # I want to output progress every page_size records,
+ # without doing a records_count % page_size every iteration.
+ # So define an external enumerator
+ # TODO should really performance test the options here.
+ records_count = 0
+ enum = table_dataset.stream.enum_for
+ loop do
+ begin
+ page_size.times do
+ encode_block.call enum.next
+ records_count += 1
+ end
+ ensure
+ logger.info "#{__method__} #{table_name} #{records_count}" if records_count < page_size
+ logger.debug " #{records_count} from #{table_dataset.sql}"
end
- ensure
- logger.info "#{__method__} #{table_name} #{records_count}" if records_count < page_size
- logger.debug " #{records_count} from #{table_dataset.sql}"
end
end
- end
- # Dump the serialization of the table to the specified io.
- #
- # TODO need to also dump a first row containing useful stuff:
- # - source table name
- # - number of rows
- # - source db url
- # - permissions?
- # These should all be in one object that can be Marshall.load-ed easily.
- #
- # TODO could speed this up by have a query thread which runs the next page-query while
- # the current one is being written/compressed.
- def dump
- _dump do |row|
- codec.encode( row.values, io ) unless dry_run?
+ # Dump the serialization of the table to the specified io.
+ #
+ # TODO need to also dump a first row containing useful stuff:
+ # - source table name
+ # - number of rows
+ # - source db url
+ # - permissions?
+ # These should all be in one object that can be Marshall.load-ed easily.
+ #
+ # TODO could speed this up by have a query thread which runs the next page-query while
+ # the current one is being written/compressed.
+ def dump
+ _dump do |row|
+ codec.encode( row.values, io ) unless dry_run?
+ end
+ ensure
+ io.flush
end
- ensure
- io.flush
- end
- # decide which kind of paged iteration will be best for this table.
- # Return an iterator, or yield row hashes to the block
- def _dump( &encode_block )
- return enum_for(__method__) unless block_given?
- case
- when table_dataset.respond_to?( :stream )
- stream_dump &encode_block
+ # decide which kind of paged iteration will be best for this table.
+ # Return an iterator, or yield row hashes to the block
+ def _dump( &encode_block )
+ return enum_for(__method__) unless block_given?
+ case
+ when table_dataset.respond_to?( :stream )
+ stream_dump &encode_block
- when primary_keys.empty?
- paginated_dump &encode_block
+ when primary_keys.empty?
+ paginated_dump &encode_block
- when primary_keys.all?{|i| i == :id }
- min_max_dump &encode_block
+ when primary_keys.all?{|i| i == :id }
+ min_max_dump &encode_block
- else
- inner_dump &encode_block
+ else
+ inner_dump &encode_block
+ end
end
- end
- def dump_matches_columns?( row_enum, columns )
- raise "schema mismatch" unless row_enum.peek.size == columns.size
- true
- rescue StopIteration
- # peek threw a StopIteration, so there's no data
- false
- end
+ def dump_matches_columns?( row_enum, columns )
+ raise "schema mismatch" unless row_enum.peek.size == columns.size
+ true
+ rescue StopIteration
+ # peek threw a StopIteration, so there's no data
+ false
+ end
- # start_row is zero-based
- #
- # TODO don't generate the full insert, ie leave out the fields
- # because we've already checked that the columns and the table
- # match.
- # TODO generate column names in insert, they might still work
- # if columns have been added to the db, but not the dump.
- def restore( start_row: 0, filename: 'io' )
- columns = table_dataset.columns
- row_enum = each_row
+ # start_row is zero-based
+ #
+ # TODO don't generate the full insert, ie leave out the fields
+ # because we've already checked that the columns and the table
+ # match.
+ # TODO generate column names in insert, they might still work
+ # if columns have been added to the db, but not the dump.
+ def restore( start_row: 0, filename: 'io' )
+ columns = table_dataset.columns
+ row_enum = each_row
- return unless dump_matches_columns?( row_enum, columns )
+ return unless dump_matches_columns?( row_enum, columns )
- logger.info "#{__method__} inserting to #{table_name} from #{start_row}"
- logger.debug " #{columns.inspect}"
- rows_restored = 0
+ logger.info "#{__method__} inserting to #{table_name} from #{start_row}"
+ logger.debug " #{columns.inspect}"
+ rows_restored = 0
- if start_row != 0
- logger.debug{ "skipping #{start_row} rows from #{filename}" }
- start_row.times do |i|
- row_enum.next
- logger.debug{ "skipped #{i} from #{filename}" } if i % page_size == 0
+ if start_row != 0
+ logger.debug{ "skipping #{start_row} rows from #{filename}" }
+ start_row.times do |i|
+ row_enum.next
+ logger.debug{ "skipped #{i} from #{filename}" } if i % page_size == 0
+ end
+ logger.debug{ "skipped #{start_row} from #{filename}" }
+ rows_restored += start_row
end
- logger.debug{ "skipped #{start_row} from #{filename}" }
- rows_restored += start_row
- end
- loop do
- db.transaction do
- begin
- page_size.times do
- # This skips all the checks in the Sequel code. Basically we want
- # to generate the
- # insert into (field1,field2) values (value1,value2)
- # statement as quickly as possible.
- #
- # Uses a private method so it will need to be updated repeatedly.
- sql = table_dataset.clone( columns: columns, values: row_enum.next ).send(:_insert_sql)
- db.execute sql unless dry_run?
- rows_restored += 1
+ loop do
+ db.transaction do
+ begin
+ page_size.times do
+ # This skips all the checks in the Sequel code. Basically we want
+ # to generate the
+ # insert into (field1,field2) values (value1,value2)
+ # statement as quickly as possible.
+ #
+ # Uses a private method so it will need to be updated repeatedly.
+ sql = table_dataset.clone( columns: columns, values: row_enum.next ).send(:_insert_sql)
+ db.execute sql unless dry_run?
+ rows_restored += 1
+ end
+ rescue StopIteration
+ # reached the end of the inout stream.
+ # So commit this transaction, and then re-raise
+ # StopIteration to get out of the loop{} statement
+ db.after_commit{ raise StopIteration }
end
- rescue StopIteration
- # reached the end of the inout stream.
- # So commit this transaction, and then re-raise
- # StopIteration to get out of the loop{} statement
- db.after_commit{ raise StopIteration }
end
end
+ logger.info "#{__method__} #{table_name} done. Inserted #{rows_restored}."
+ rows_restored
end
- logger.info "#{__method__} #{table_name} done. Inserted #{rows_restored}."
- rows_restored
- end
- # Enumerate through the given io at its current position.
- # Can raise StopIteration (ie when eof is not detected)
- # MAYBE don't check for io.eof here, leave that to the codec
- def each_row
- return enum_for(__method__) unless block_given?
- yield codec.decode( io ) until io.eof?
- end
+ # Enumerate through the given io at its current position.
+ # Can raise StopIteration (ie when eof is not detected)
+ # MAYBE don't check for io.eof here, leave that to the codec
+ def each_row
+ return enum_for(__method__) unless block_given?
+ yield codec.decode( io ) until io.eof?
+ end
- # Enumerate sql insert statements from the dump
- def insert_sql_each
- return enum_for(__method__) unless block_given?
- each_row do |row|
- yield table_dataset.insert_sql( row )
+ # Enumerate sql insert statements from the dump
+ def insert_sql_each
+ return enum_for(__method__) unless block_given?
+ each_row do |row|
+ yield table_dataset.insert_sql( row )
+ end
end
end
end