lib/pgslice.rb in pgslice-0.1.3 vs lib/pgslice.rb in pgslice-0.1.4

- old
+ new

@@ -13,10 +13,12 @@ day: "YYYYMMDD", month: "YYYYMM" } def initialize(args) + $stdout.sync = true + $stderr.sync = true parse_args(args) @command = @arguments.shift end def perform @@ -60,30 +62,28 @@ abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] queries = [] queries << <<-SQL -CREATE TABLE #{intermediate_table} ( - LIKE #{table} INCLUDING ALL -); +CREATE TABLE #{intermediate_table} (LIKE #{table} INCLUDING ALL); SQL sql_format = SQL_FORMAT[period.to_sym] queries << <<-SQL CREATE FUNCTION #{trigger_name}() -RETURNS trigger AS $$ -BEGIN - EXECUTE 'INSERT INTO public.#{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW; - RETURN NULL; -END; -$$ LANGUAGE plpgsql; + RETURNS trigger AS $$ + BEGIN + EXECUTE 'INSERT INTO #{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; SQL queries << <<-SQL CREATE TRIGGER #{trigger_name} -BEFORE INSERT ON #{intermediate_table} -FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}(); + BEFORE INSERT ON #{intermediate_table} + FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}(); SQL run_queries(queries) end @@ -127,19 +127,17 @@ days.each do |day| partition_name = "#{original_table}_#{day.strftime(name_format)}" next if table_exists?(partition_name) - date_format = "%Y-%m-%d" - queries << <<-SQL -CREATE TABLE #{partition_name} ( - CHECK (#{field} >= '#{day.strftime(date_format)}'::date AND #{field} < '#{(day + inc).strftime(date_format)}'::date) -) INHERITS (#{table}); +CREATE TABLE #{partition_name} + (CHECK (#{field} >= #{sql_date(day)} AND #{field} < #{sql_date(day + inc)})) + INHERITS (#{table}); SQL - queries << "ALTER TABLE #{partition_name} ADD PRIMARY KEY (#{primary_key});" + queries << "ALTER TABLE #{partition_name} ADD PRIMARY KEY (#{primary_key});" if primary_key index_defs.each do |index_def| queries << index_def.sub(" ON #{original_table} USING ", " ON #{partition_name} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";" end end @@ -163,38 +161,48 @@ abort "Table not found: #{source_table}" unless table_exists?(source_table) abort "Table not found: #{dest_table}" unless table_exists?(dest_table) period, field, name_format, inc, today = settings_from_table(table, dest_table) - date_format = "%Y-%m-%d" - existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) } + existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }.sort starting_time = DateTime.strptime(existing_tables.first.last(8), name_format) ending_time = DateTime.strptime(existing_tables.last.last(8), name_format) + inc primary_key = self.primary_key(table) max_source_id = max_id(source_table, primary_key) - max_dest_id = max_id(dest_table, primary_key) + max_dest_id = + if options[:swapped] + max_id(dest_table, primary_key, below: max_source_id) + else + max_id(dest_table, primary_key) + end - starting_id = max_dest_id + 1 + if max_dest_id == 0 && !options[:swapped] + min_source_id = min_id(source_table, primary_key, field, starting_time) + max_dest_id = min_source_id - 1 if min_source_id + end + + starting_id = max_dest_id fields = columns(source_table).join(", ") batch_size = options[:batch_size] - log "Overview" - log "#{source_table} max #{primary_key}: #{max_source_id}" - log "#{dest_table} max #{primary_key}: #{max_dest_id}" - log "time period: #{starting_time.to_date} -> #{ending_time.to_date}" - log + i = 1 + batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil + while starting_id < max_source_id + query = <<-SQL +/* #{i} of #{batch_count} */ +INSERT INTO #{dest_table} (#{fields}) + SELECT #{fields} FROM #{source_table} + WHERE #{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size} AND #{field} >= #{sql_date(starting_time)} AND #{field} < #{sql_date(ending_time)} + SQL - log "Batches" - while starting_id <= max_source_id - log "#{starting_id}..#{[starting_id + batch_size - 1, max_source_id].min}" - - query = "INSERT INTO #{dest_table} (#{fields}) SELECT #{fields} FROM #{source_table} WHERE #{primary_key} >= #{starting_id} AND #{primary_key} < #{starting_id + batch_size} AND #{field} >= '#{starting_time.strftime(date_format)}'::date AND #{field} < '#{ending_time.strftime(date_format)}'::date" - log query if options[:debug] + log_sql(query) + log_sql execute(query) starting_id += batch_size + i += 1 if options[:sleep] && starting_id <= max_source_id sleep(options[:sleep]) end end @@ -212,10 +220,15 @@ queries = [ "ALTER TABLE #{table} RENAME TO #{retired_table};", "ALTER TABLE #{intermediate_table} RENAME TO #{table};" ] + + self.sequences(table).each do |sequence| + queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};" + end + run_queries(queries) end def unswap table = arguments.first @@ -229,10 +242,15 @@ queries = [ "ALTER TABLE #{table} RENAME TO #{intermediate_table};", "ALTER TABLE #{retired_table} RENAME TO #{table};" ] + + self.sequences(table).each do |sequence| + queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};" + end + run_queries(queries) end # arguments @@ -240,12 +258,12 @@ opts = Slop.parse(args) do |o| o.boolean "--intermediate" o.boolean "--swapped" o.boolean "--debug" o.float "--sleep" - o.integer "--future", default: 3 - o.integer "--past", default: 3 + o.integer "--future", default: 0 + o.integer "--past", default: 0 o.integer "--batch-size", default: 10000 o.boolean "--dry-run", default: false o.on "-v", "--version", "print the version" do log PgSlice::VERSION @exit = true @@ -341,18 +359,43 @@ SQL row = execute(query, ["public", table])[0] row && row["attname"] end - def max_id(table, primary_key) - execute("SELECT MAX(#{primary_key}) FROM #{table}")[0]["max"].to_i + def max_id(table, primary_key, below: nil) + query = "SELECT MAX(#{primary_key}) FROM #{table}" + query << " WHERE #{primary_key} <= #{below}" if below + execute(query)[0]["max"].to_i end + def min_id(table, primary_key, column, starting_time) + query = "SELECT MIN(#{primary_key}) FROM #{table} WHERE #{column} >= #{sql_date(starting_time)}" + execute(query)[0]["min"].to_i + end + def has_trigger?(trigger_name, table) - execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, "public.#{table}"]).any? + execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table]).any? end + # http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL + def sequences(table) + query = <<-SQL + SELECT + a.attname as related_column, + s.relname as sequence_name + FROM pg_class s + JOIN pg_depend d ON d.objid = s.oid + JOIN pg_class t ON d.objid = s.oid AND d.refobjid = t.oid + JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum) + JOIN pg_namespace n ON n.oid = s.relnamespace + WHERE s.relkind = 'S' + AND n.nspname = 'public' + AND t.relname = $1 + SQL + execute(query, [table]) + end + # helpers def trigger_name(table) "#{table}_insert_trigger" end @@ -361,9 +404,13 @@ "#{table}_intermediate" end def retired_name(table) "#{table}_retired" + end + + def sql_date(time) + "'#{time.strftime("%Y-%m-%d")}'::date" end def settings_from_table(original_table, table) trigger_name = self.trigger_name(original_table) function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]["pg_get_functiondef"]