lib/wyrm/pump.rb in wyrm-0.3.3 vs lib/wyrm/pump.rb in wyrm-0.4.0

- old
+ new

@@ -24,11 +24,11 @@ attr_writer :logger attr_accessor :io, :page_size, :dry_run def dry_run?; dry_run; end - # These affect cached values + # These are affected by cached values attr_reader :db, :table_name def invalidate_cached_members @primary_keys = nil @table_dataset = nil @@ -111,23 +111,24 @@ obj end end def primary_keys - @primary_keys ||= db.schema(table_name).select{|df| df.last[:primary_key]}.map{|df| df.first} + # each_with_object([]){...} is only faster for < 3 items in 100000 + @primary_keys ||= db.schema(table_name).map{|name,column_info| name if column_info[:primary_key]}.compact end def table_dataset @table_dataset ||= db[table_name.to_sym] end # Use limit / offset. Last fallback if there are no keys (or a compound primary key?). def paginated_dump( &encode_block ) records_count = 0 table_dataset.order(*primary_keys).each_page(page_size) do |page| - logger.info{ "#{__method__} #{table_name} #{records_count}" } - logger.debug{ page.sql } + logger.info "#{__method__} #{table_name} #{records_count}" + logger.debug page.sql page.each &encode_block records_count += page_size end end @@ -140,15 +141,16 @@ # on full.id = limit.id # order by full.whatever # http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/ def inner_dump( &encode_block ) # could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil) - 0.step(table_dataset.count, page_size).each do |offset| + on_conditions = primary_keys.map{|f| [f,f]}.to_h + (0..table_dataset.count).step(page_size).each do |offset| limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys ) - page = table_dataset.join( limit_dataset, Hash[ primary_keys.map{|f| [f,f]} ] ).order( *primary_keys ).qualify(table_name) - logger.info{ "#{__method__} #{table_name} #{offset}" } - logger.debug{ page.sql } + page = table_dataset.join( limit_dataset, on_conditions ).order( *primary_keys ).qualify(table_name) + logger.info "#{__method__} #{table_name} #{offset}" + logger.debug page.sql page.each &encode_block end end # Selects pages by a range of ids, using >= and <. @@ -162,18 +164,18 @@ # will always include the last item because page_size will be # bigger than max for the last page (min..max).step(page_size).each do |offset| page = table_dataset.where( id: offset...(offset + page_size) ) - logger.info{ "#{__method__} #{table_name} #{offset}" } - logger.debug{ page.sql } + logger.info "#{__method__} #{table_name} #{offset}" + logger.debug page.sql page.each &encode_block end end def stream_dump( &encode_block ) - logger.debug{ "using result set streaming" } + logger.info "using result set streaming" # I want to output progress every page_size records, # without doing a records_count % page_size every iteration. # So define an external enumerator # TODO should really performance test the options here. @@ -184,12 +186,12 @@ page_size.times do encode_block.call enum.next records_count += 1 end ensure - logger.info{ "#{__method__} #{table_name} #{records_count}" if records_count < page_size } - logger.debug{ " from #{table_dataset.sql}" } + logger.info "#{__method__} #{table_name} #{records_count}" if records_count < page_size + logger.debug " #{records_count} from #{table_dataset.sql}" end end end # Dump the serialization of the table to the specified io. @@ -216,14 +218,17 @@ def _dump( &encode_block ) return enum_for(__method__) unless block_given? case when table_dataset.respond_to?( :stream ) stream_dump &encode_block + when primary_keys.empty? paginated_dump &encode_block + when primary_keys.all?{|i| i == :id } min_max_dump &encode_block + else inner_dump &encode_block end end @@ -233,24 +238,25 @@ rescue StopIteration # peek threw a StopIteration, so there's no data false end + # start_row is zero-based + # # TODO don't generate the full insert, ie leave out the fields # because we've already checked that the columns and the table # match. # TODO generate column names in insert, they might still work # if columns have been added to the db, but not the dump. - # start_row is zero-based def restore( start_row: 0, filename: 'io' ) columns = table_dataset.columns row_enum = each_row return unless dump_matches_columns?( row_enum, columns ) - logger.info{ "#{__method__} inserting to #{table_name} from #{start_row}" } - logger.debug{ " #{columns.inspect}" } + logger.info "#{__method__} inserting to #{table_name} from #{start_row}" + logger.debug " #{columns.inspect}" rows_restored = 0 if start_row != 0 logger.debug{ "skipping #{start_row} rows from #{filename}" } start_row.times do |i| @@ -281,10 +287,10 @@ # StopIteration to get out of the loop{} statement db.after_commit{ raise StopIteration } end end end - logger.info{ "#{__method__} #{table_name} done. Inserted #{rows_restored}." } + logger.info "#{__method__} #{table_name} done. Inserted #{rows_restored}." rows_restored end # Enumerate through the given io at its current position. # Can raise StopIteration (ie when eof is not detected)