lib/pgslice.rb in pgslice-0.4.0 vs lib/pgslice.rb in pgslice-0.4.1
- old
+ new
@@ -52,11 +52,13 @@
# commands
def prep
table, column, period = arguments
+ table = qualify_table(table)
intermediate_table = "#{table}_intermediate"
+
trigger_name = self.trigger_name(table)
if options[:no_partition]
abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1
abort "Can't use --trigger-based and --no-partition" if options[:trigger_based]
@@ -75,25 +77,25 @@
declarative = server_version_num >= 100000 && !options[:trigger_based]
if declarative && !options[:no_partition]
queries << <<-SQL
-CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_ident(column)});
+CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_table(column)});
SQL
# add comment
cast = column_cast(table, column)
queries << <<-SQL
-COMMENT ON TABLE #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
+COMMENT ON TABLE #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
SQL
else
queries << <<-SQL
-CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL);
+CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING ALL);
SQL
foreign_keys(table).each do |fk_def|
- queries << "ALTER TABLE #{quote_ident(intermediate_table)} ADD #{fk_def};"
+ queries << "ALTER TABLE #{quote_table(intermediate_table)} ADD #{fk_def};"
end
end
if !options[:no_partition] && !declarative
sql_format = SQL_FORMAT[period.to_sym]
@@ -106,40 +108,40 @@
$$ LANGUAGE plpgsql;
SQL
queries << <<-SQL
CREATE TRIGGER #{quote_ident(trigger_name)}
- BEFORE INSERT ON #{quote_ident(intermediate_table)}
+ BEFORE INSERT ON #{quote_table(intermediate_table)}
FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}();
SQL
cast = column_cast(table, column)
queries << <<-SQL
-COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
+COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
SQL
end
run_queries(queries)
end
def unprep
- table = arguments.first
+ table = qualify_table(arguments.first)
intermediate_table = "#{table}_intermediate"
trigger_name = self.trigger_name(table)
abort "Usage: pgslice unprep <table>" if arguments.length != 1
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
queries = [
- "DROP TABLE #{quote_ident(intermediate_table)} CASCADE;",
+ "DROP TABLE #{quote_table(intermediate_table)} CASCADE;",
"DROP FUNCTION IF EXISTS #{quote_ident(trigger_name)}();"
]
run_queries(queries)
end
def add_partitions
- original_table = arguments.first
+ original_table = qualify_table(arguments.first)
table = options[:intermediate] ? "#{original_table}_intermediate" : original_table
trigger_name = self.trigger_name(original_table)
abort "Usage: pgslice add_partitions <table>" if arguments.length != 1
abort "Table not found: #{table}" unless table_exists?(table)
@@ -156,11 +158,11 @@
end
queries = []
if needs_comment
- queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';"
+ queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(table)} is 'column:#{field},period:#{period},cast:#{cast}';"
end
# today = utc date
today = round_date(DateTime.now.new_offset(0).to_date, period)
@@ -168,13 +170,14 @@
if !declarative
table
elsif options[:intermediate]
original_table
else
- "#{original_table}_#{today.strftime(name_format(period))}"
+ existing_partitions(original_table, period).last
end
- index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
+
+ index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
fk_defs = foreign_keys(schema_table)
primary_key = self.primary_key(schema_table)
added_partitions = []
range.each do |n|
@@ -184,46 +187,46 @@
next if table_exists?(partition_name)
added_partitions << partition_name
if declarative
queries << <<-SQL
-CREATE TABLE #{quote_ident(partition_name)} PARTITION OF #{quote_ident(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)});
+CREATE TABLE #{quote_table(partition_name)} PARTITION OF #{quote_table(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)});
SQL
else
queries << <<-SQL
-CREATE TABLE #{quote_ident(partition_name)}
+CREATE TABLE #{quote_table(partition_name)}
(CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}))
- INHERITS (#{quote_ident(table)});
+ INHERITS (#{quote_table(table)});
SQL
end
- queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key
+ queries << "ALTER TABLE #{quote_table(partition_name)} ADD PRIMARY KEY (#{primary_key.map { |k| quote_ident(k) }.join(", ")});" if primary_key.any?
index_defs.each do |index_def|
- queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_ident(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
+ queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_table(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
end
fk_defs.each do |fk_def|
- queries << "ALTER TABLE #{quote_ident(partition_name)} ADD #{fk_def};"
+ queries << "ALTER TABLE #{quote_table(partition_name)} ADD #{fk_def};"
end
end
unless declarative
# update trigger based on existing partitions
current_defs = []
future_defs = []
past_defs = []
name_format = self.name_format(period)
- existing_tables = existing_partitions(original_table)
+ existing_tables = existing_partitions(original_table, period)
existing_tables = (existing_tables + added_partitions).uniq.sort
existing_tables.each do |table|
day = DateTime.strptime(table.split("_").last, name_format)
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
- INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);"
+ INSERT INTO #{quote_table(partition_name)} VALUES (NEW.*);"
if day.to_date < today
past_defs << sql
elsif advance_date(day, period, 1) < today
current_defs << sql
@@ -253,11 +256,11 @@
run_queries(queries) if queries.any?
end
def fill
- table = arguments.first
+ table = qualify_table(arguments.first)
abort "Usage: pgslice fill <table>" if arguments.length != 1
source_table = options[:source_table]
dest_table = options[:dest_table]
@@ -276,20 +279,22 @@
period, field, cast, needs_comment, declarative = settings_from_trigger(table, dest_table)
if period
name_format = self.name_format(period)
- existing_tables = existing_partitions(table)
+ existing_tables = existing_partitions(table, period)
if existing_tables.any?
starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format)
ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1)
end
end
schema_table = period && declarative ? existing_tables.last : table
- primary_key = self.primary_key(schema_table)
+
+ primary_key = self.primary_key(schema_table)[0]
abort "No primary key" unless primary_key
+
max_source_id = max_id(source_table, primary_key)
max_dest_id =
if options[:start]
options[:start]
@@ -324,12 +329,12 @@
where << " AND #{options[:where]}"
end
query = <<-SQL
/* #{i} of #{batch_count} */
-INSERT INTO #{quote_ident(dest_table)} (#{fields})
- SELECT #{fields} FROM #{quote_ident(source_table)}
+INSERT INTO #{quote_table(dest_table)} (#{fields})
+ SELECT #{fields} FROM #{quote_table(source_table)}
WHERE #{where}
SQL
run_query(query)
@@ -341,22 +346,22 @@
end
end
end
def swap
- table = arguments.first
+ table = qualify_table(arguments.first)
intermediate_table = intermediate_name(table)
retired_table = retired_name(table)
abort "Usage: pgslice swap <table>" if arguments.length != 1
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
abort "Table already exists: #{retired_table}" if table_exists?(retired_table)
queries = [
- "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(retired_table)};",
- "ALTER TABLE #{quote_ident(intermediate_table)} RENAME TO #{quote_ident(table)};"
+ "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};",
+ "ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};"
]
self.sequences(table).each do |sequence|
queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};"
end
@@ -365,40 +370,40 @@
run_queries(queries)
end
def unswap
- table = arguments.first
+ table = qualify_table(arguments.first)
intermediate_table = intermediate_name(table)
retired_table = retired_name(table)
abort "Usage: pgslice unswap <table>" if arguments.length != 1
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table not found: #{retired_table}" unless table_exists?(retired_table)
abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
queries = [
- "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(intermediate_table)};",
- "ALTER TABLE #{quote_ident(retired_table)} RENAME TO #{quote_ident(table)};"
+ "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};",
+ "ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};"
]
self.sequences(table).each do |sequence|
queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};"
end
run_queries(queries)
end
def analyze
- table = arguments.first
+ table = qualify_table(arguments.first)
parent_table = options[:swapped] ? table : intermediate_name(table)
abort "Usage: pgslice analyze <table>" if arguments.length != 1
existing_tables = existing_partitions(table)
analyze_list = existing_tables + [parent_table]
- run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_ident(t)};" }
+ run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_table(t)};" }
end
# arguments
def parse_args(args)
@@ -506,25 +511,35 @@
def server_version_num
execute("SHOW server_version_num")[0]["server_version_num"].to_i
end
- def existing_partitions(table)
- existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{6,8}\z/.match(t) }
+ def existing_partitions(table, period = nil)
+ count =
+ case period
+ when "day"
+ 8
+ when "month"
+ 6
+ else
+ "6,8"
+ end
+
+ existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{#{count}}\z/.match(t) }
end
def existing_tables(like:)
- query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2"
- execute(query, [schema, like]).map { |r| r["tablename"] }.sort
+ query = "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2"
+ execute(query, like.split(".", 2)).map { |r| "#{r["schemaname"]}.#{r["tablename"]}" }.sort
end
def table_exists?(table)
existing_tables(like: table).any?
end
def columns(table)
- execute("SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2", [schema, table]).map{ |r| r["column_name"] }
+ execute("SELECT column_name FROM information_schema.columns WHERE table_schema || '.' || table_name = $1", [table]).map{ |r| r["column_name"] }
end
# http://stackoverflow.com/a/20537829
def primary_key(table)
query = <<-SQL
@@ -532,33 +547,31 @@
pg_attribute.attname,
format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
FROM
pg_index, pg_class, pg_attribute, pg_namespace
WHERE
- relname = $2 AND
+ nspname || '.' || relname = $1 AND
indrelid = pg_class.oid AND
- nspname = $1 AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey) AND
indisprimary
SQL
- row = execute(query, [schema, table])[0]
- row && row["attname"]
+ execute(query, [table]).map { |r| r["attname"] }
end
def max_id(table, primary_key, below: nil, where: nil)
- query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}"
+ query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_table(table)}"
conditions = []
conditions << "#{quote_ident(primary_key)} <= #{below}" if below
conditions << where if where
query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
execute(query)[0]["max"].to_i
end
def min_id(table, primary_key, column, cast, starting_time, where)
- query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}"
+ query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_table(table)}"
conditions = []
conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time
conditions << where if where
query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
(execute(query)[0]["min"] || 1).to_i
@@ -567,11 +580,11 @@
def has_trigger?(trigger_name, table)
!fetch_trigger(trigger_name, table).nil?
end
def fetch_comment(table)
- execute("SELECT obj_description(#{regclass(schema, table)}) AS comment")[0]
+ execute("SELECT obj_description(#{regclass(table)}) AS comment")[0]
end
# http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL
def sequences(table)
query = <<-SQL
@@ -591,11 +604,11 @@
end
# helpers
def trigger_name(table)
- "#{table}_insert_trigger"
+ "#{table.split(".")[-1]}_insert_trigger"
end
def intermediate_name(table)
"#{table}_intermediate"
end
@@ -603,11 +616,11 @@
def retired_name(table)
"#{table}_retired"
end
def column_cast(table, column)
- data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3", [schema, table, column])[0]["data_type"]
+ data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema || '.' || table_name = $1 AND column_name = $2", [table, column])[0]["data_type"]
data_type == "timestamp with time zone" ? "timestamptz" : "date"
end
def sql_date(time, cast, add_cast = true)
if cast == "timestamptz"
@@ -650,18 +663,30 @@
def quote_ident(value)
PG::Connection.quote_ident(value)
end
- def regclass(schema, table)
- "'#{quote_ident(schema)}.#{quote_ident(table)}'::regclass"
+ def quote_table(table)
+ table.split(".", 2).map { |v| quote_ident(v) }.join(".")
end
+ def quote_no_schema(table)
+ quote_ident(table.split(".", 2)[-1])
+ end
+
+ def regclass(table)
+ "'#{quote_table(table)}'::regclass"
+ end
+
def fetch_trigger(trigger_name, table)
- execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = #{regclass(schema, table)}", [trigger_name])[0]
+ execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = #{regclass(table)}", [trigger_name])[0]
end
+ def qualify_table(table)
+ table.to_s.include?(".") ? table : [schema, table].join(".")
+ end
+
def settings_from_trigger(original_table, table)
trigger_name = self.trigger_name(original_table)
needs_comment = false
trigger_comment = fetch_trigger(trigger_name, table)
@@ -690,10 +715,10 @@
[period, field, cast, needs_comment, !trigger_comment]
end
def foreign_keys(table)
- execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(schema, table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] }
+ execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] }
end
def server_version_num
execute("SHOW server_version_num").first["server_version_num"].to_i
end