lib/pgslice.rb in pgslice-0.1.3 vs lib/pgslice.rb in pgslice-0.1.4
- old
+ new
@@ -13,10 +13,12 @@
day: "YYYYMMDD",
month: "YYYYMM"
}
def initialize(args)
+ $stdout.sync = true
+ $stderr.sync = true
parse_args(args)
@command = @arguments.shift
end
def perform
@@ -60,30 +62,28 @@
abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym]
queries = []
queries << <<-SQL
-CREATE TABLE #{intermediate_table} (
- LIKE #{table} INCLUDING ALL
-);
+CREATE TABLE #{intermediate_table} (LIKE #{table} INCLUDING ALL);
SQL
sql_format = SQL_FORMAT[period.to_sym]
queries << <<-SQL
CREATE FUNCTION #{trigger_name}()
-RETURNS trigger AS $$
-BEGIN
- EXECUTE 'INSERT INTO public.#{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW;
- RETURN NULL;
-END;
-$$ LANGUAGE plpgsql;
+ RETURNS trigger AS $$
+ BEGIN
+ EXECUTE 'INSERT INTO #{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW;
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
SQL
queries << <<-SQL
CREATE TRIGGER #{trigger_name}
-BEFORE INSERT ON #{intermediate_table}
-FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}();
+ BEFORE INSERT ON #{intermediate_table}
+ FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}();
SQL
run_queries(queries)
end
@@ -127,19 +127,17 @@
days.each do |day|
partition_name = "#{original_table}_#{day.strftime(name_format)}"
next if table_exists?(partition_name)
- date_format = "%Y-%m-%d"
-
queries << <<-SQL
-CREATE TABLE #{partition_name} (
- CHECK (#{field} >= '#{day.strftime(date_format)}'::date AND #{field} < '#{(day + inc).strftime(date_format)}'::date)
-) INHERITS (#{table});
+CREATE TABLE #{partition_name}
+ (CHECK (#{field} >= #{sql_date(day)} AND #{field} < #{sql_date(day + inc)}))
+ INHERITS (#{table});
SQL
- queries << "ALTER TABLE #{partition_name} ADD PRIMARY KEY (#{primary_key});"
+ queries << "ALTER TABLE #{partition_name} ADD PRIMARY KEY (#{primary_key});" if primary_key
index_defs.each do |index_def|
queries << index_def.sub(" ON #{original_table} USING ", " ON #{partition_name} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
end
end
@@ -163,38 +161,48 @@
abort "Table not found: #{source_table}" unless table_exists?(source_table)
abort "Table not found: #{dest_table}" unless table_exists?(dest_table)
period, field, name_format, inc, today = settings_from_table(table, dest_table)
- date_format = "%Y-%m-%d"
- existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }
+ existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }.sort
starting_time = DateTime.strptime(existing_tables.first.last(8), name_format)
ending_time = DateTime.strptime(existing_tables.last.last(8), name_format) + inc
primary_key = self.primary_key(table)
max_source_id = max_id(source_table, primary_key)
- max_dest_id = max_id(dest_table, primary_key)
+ max_dest_id =
+ if options[:swapped]
+ max_id(dest_table, primary_key, below: max_source_id)
+ else
+ max_id(dest_table, primary_key)
+ end
- starting_id = max_dest_id + 1
+ if max_dest_id == 0 && !options[:swapped]
+ min_source_id = min_id(source_table, primary_key, field, starting_time)
+ max_dest_id = min_source_id - 1 if min_source_id
+ end
+
+ starting_id = max_dest_id
fields = columns(source_table).join(", ")
batch_size = options[:batch_size]
- log "Overview"
- log "#{source_table} max #{primary_key}: #{max_source_id}"
- log "#{dest_table} max #{primary_key}: #{max_dest_id}"
- log "time period: #{starting_time.to_date} -> #{ending_time.to_date}"
- log
+ i = 1
+ batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil
+ while starting_id < max_source_id
+ query = <<-SQL
+/* #{i} of #{batch_count} */
+INSERT INTO #{dest_table} (#{fields})
+ SELECT #{fields} FROM #{source_table}
+ WHERE #{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size} AND #{field} >= #{sql_date(starting_time)} AND #{field} < #{sql_date(ending_time)}
+ SQL
- log "Batches"
- while starting_id <= max_source_id
- log "#{starting_id}..#{[starting_id + batch_size - 1, max_source_id].min}"
-
- query = "INSERT INTO #{dest_table} (#{fields}) SELECT #{fields} FROM #{source_table} WHERE #{primary_key} >= #{starting_id} AND #{primary_key} < #{starting_id + batch_size} AND #{field} >= '#{starting_time.strftime(date_format)}'::date AND #{field} < '#{ending_time.strftime(date_format)}'::date"
- log query if options[:debug]
+ log_sql(query)
+ log_sql
execute(query)
starting_id += batch_size
+ i += 1
if options[:sleep] && starting_id <= max_source_id
sleep(options[:sleep])
end
end
@@ -212,10 +220,15 @@
queries = [
"ALTER TABLE #{table} RENAME TO #{retired_table};",
"ALTER TABLE #{intermediate_table} RENAME TO #{table};"
]
+
+ self.sequences(table).each do |sequence|
+ queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};"
+ end
+
run_queries(queries)
end
def unswap
table = arguments.first
@@ -229,10 +242,15 @@
queries = [
"ALTER TABLE #{table} RENAME TO #{intermediate_table};",
"ALTER TABLE #{retired_table} RENAME TO #{table};"
]
+
+ self.sequences(table).each do |sequence|
+ queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};"
+ end
+
run_queries(queries)
end
# arguments
@@ -240,12 +258,12 @@
opts = Slop.parse(args) do |o|
o.boolean "--intermediate"
o.boolean "--swapped"
o.boolean "--debug"
o.float "--sleep"
- o.integer "--future", default: 3
- o.integer "--past", default: 3
+ o.integer "--future", default: 0
+ o.integer "--past", default: 0
o.integer "--batch-size", default: 10000
o.boolean "--dry-run", default: false
o.on "-v", "--version", "print the version" do
log PgSlice::VERSION
@exit = true
@@ -341,18 +359,43 @@
SQL
row = execute(query, ["public", table])[0]
row && row["attname"]
end
- def max_id(table, primary_key)
- execute("SELECT MAX(#{primary_key}) FROM #{table}")[0]["max"].to_i
+ def max_id(table, primary_key, below: nil)
+ query = "SELECT MAX(#{primary_key}) FROM #{table}"
+ query << " WHERE #{primary_key} <= #{below}" if below
+ execute(query)[0]["max"].to_i
end
+ def min_id(table, primary_key, column, starting_time)
+ query = "SELECT MIN(#{primary_key}) FROM #{table} WHERE #{column} >= #{sql_date(starting_time)}"
+ execute(query)[0]["min"].to_i
+ end
+
def has_trigger?(trigger_name, table)
- execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, "public.#{table}"]).any?
+ execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table]).any?
end
+ # http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL
+ def sequences(table)
+ query = <<-SQL
+ SELECT
+ a.attname as related_column,
+ s.relname as sequence_name
+ FROM pg_class s
+ JOIN pg_depend d ON d.objid = s.oid
+ JOIN pg_class t ON d.objid = s.oid AND d.refobjid = t.oid
+ JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
+ JOIN pg_namespace n ON n.oid = s.relnamespace
+ WHERE s.relkind = 'S'
+ AND n.nspname = 'public'
+ AND t.relname = $1
+ SQL
+ execute(query, [table])
+ end
+
# helpers
def trigger_name(table)
"#{table}_insert_trigger"
end
@@ -361,9 +404,13 @@
"#{table}_intermediate"
end
def retired_name(table)
"#{table}_retired"
+ end
+
+ def sql_date(time)
+ "'#{time.strftime("%Y-%m-%d")}'::date"
end
def settings_from_table(original_table, table)
trigger_name = self.trigger_name(original_table)
function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]["pg_get_functiondef"]