lib/wyrm/pump.rb in wyrm-0.3.3 vs lib/wyrm/pump.rb in wyrm-0.4.0
- old
+ new
@@ -24,11 +24,11 @@
attr_writer :logger
attr_accessor :io, :page_size, :dry_run
def dry_run?; dry_run; end
- # These affect cached values
+ # These are affected by cached values
attr_reader :db, :table_name
def invalidate_cached_members
@primary_keys = nil
@table_dataset = nil
@@ -111,23 +111,24 @@
obj
end
end
def primary_keys
- @primary_keys ||= db.schema(table_name).select{|df| df.last[:primary_key]}.map{|df| df.first}
+ # each_with_object([]){...} is only faster for < 3 items in 100000
+ @primary_keys ||= db.schema(table_name).map{|name,column_info| name if column_info[:primary_key]}.compact
end
def table_dataset
@table_dataset ||= db[table_name.to_sym]
end
# Use limit / offset. Last fallback if there are no keys (or a compound primary key?).
def paginated_dump( &encode_block )
records_count = 0
table_dataset.order(*primary_keys).each_page(page_size) do |page|
- logger.info{ "#{__method__} #{table_name} #{records_count}" }
- logger.debug{ page.sql }
+ logger.info "#{__method__} #{table_name} #{records_count}"
+ logger.debug page.sql
page.each &encode_block
records_count += page_size
end
end
@@ -140,15 +141,16 @@
# on full.id = limit.id
# order by full.whatever
# http://www.numerati.com/2012/06/26/reading-large-result-sets-with-hibernate-and-mysql/
def inner_dump( &encode_block )
# could possibly overrride Dataset#paginate(page_no, page_size, record_count=nil)
- 0.step(table_dataset.count, page_size).each do |offset|
+ on_conditions = primary_keys.map{|f| [f,f]}.to_h
+ (0..table_dataset.count).step(page_size).each do |offset|
limit_dataset = table_dataset.select( *primary_keys ).limit( page_size, offset ).order( *primary_keys )
- page = table_dataset.join( limit_dataset, Hash[ primary_keys.map{|f| [f,f]} ] ).order( *primary_keys ).qualify(table_name)
- logger.info{ "#{__method__} #{table_name} #{offset}" }
- logger.debug{ page.sql }
+ page = table_dataset.join( limit_dataset, on_conditions ).order( *primary_keys ).qualify(table_name)
+ logger.info "#{__method__} #{table_name} #{offset}"
+ logger.debug page.sql
page.each &encode_block
end
end
# Selects pages by a range of ids, using >= and <.
@@ -162,18 +164,18 @@
# will always include the last item because page_size will be
# bigger than max for the last page
(min..max).step(page_size).each do |offset|
page = table_dataset.where( id: offset...(offset + page_size) )
- logger.info{ "#{__method__} #{table_name} #{offset}" }
- logger.debug{ page.sql }
+ logger.info "#{__method__} #{table_name} #{offset}"
+ logger.debug page.sql
page.each &encode_block
end
end
def stream_dump( &encode_block )
- logger.debug{ "using result set streaming" }
+ logger.info "using result set streaming"
# I want to output progress every page_size records,
# without doing a records_count % page_size every iteration.
# So define an external enumerator
# TODO should really performance test the options here.
@@ -184,12 +186,12 @@
page_size.times do
encode_block.call enum.next
records_count += 1
end
ensure
- logger.info{ "#{__method__} #{table_name} #{records_count}" if records_count < page_size }
- logger.debug{ " from #{table_dataset.sql}" }
+ logger.info "#{__method__} #{table_name} #{records_count}" if records_count < page_size
+ logger.debug " #{records_count} from #{table_dataset.sql}"
end
end
end
# Dump the serialization of the table to the specified io.
@@ -216,14 +218,17 @@
def _dump( &encode_block )
return enum_for(__method__) unless block_given?
case
when table_dataset.respond_to?( :stream )
stream_dump &encode_block
+
when primary_keys.empty?
paginated_dump &encode_block
+
when primary_keys.all?{|i| i == :id }
min_max_dump &encode_block
+
else
inner_dump &encode_block
end
end
@@ -233,24 +238,25 @@
rescue StopIteration
# peek threw a StopIteration, so there's no data
false
end
+ # start_row is zero-based
+ #
# TODO don't generate the full insert, ie leave out the fields
# because we've already checked that the columns and the table
# match.
# TODO generate column names in insert, they might still work
# if columns have been added to the db, but not the dump.
- # start_row is zero-based
def restore( start_row: 0, filename: 'io' )
columns = table_dataset.columns
row_enum = each_row
return unless dump_matches_columns?( row_enum, columns )
- logger.info{ "#{__method__} inserting to #{table_name} from #{start_row}" }
- logger.debug{ " #{columns.inspect}" }
+ logger.info "#{__method__} inserting to #{table_name} from #{start_row}"
+ logger.debug " #{columns.inspect}"
rows_restored = 0
if start_row != 0
logger.debug{ "skipping #{start_row} rows from #{filename}" }
start_row.times do |i|
@@ -281,10 +287,10 @@
# StopIteration to get out of the loop{} statement
db.after_commit{ raise StopIteration }
end
end
end
- logger.info{ "#{__method__} #{table_name} done. Inserted #{rows_restored}." }
+ logger.info "#{__method__} #{table_name} done. Inserted #{rows_restored}."
rows_restored
end
# Enumerate through the given io at its current position.
# Can raise StopIteration (ie when eof is not detected)