lib/pgslice.rb in pgslice-0.3.4 vs lib/pgslice.rb in pgslice-0.3.5
- old
+ new
@@ -71,33 +71,33 @@
end
queries = []
queries << <<-SQL
-CREATE TABLE #{intermediate_table} (LIKE #{table} INCLUDING ALL);
+CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL);
SQL
unless options[:no_partition]
sql_format = SQL_FORMAT[period.to_sym]
queries << <<-SQL
-CREATE FUNCTION #{trigger_name}()
+CREATE FUNCTION #{quote_ident(trigger_name)}()
RETURNS trigger AS $$
BEGIN
RAISE EXCEPTION 'Create partitions first.';
END;
$$ LANGUAGE plpgsql;
SQL
queries << <<-SQL
-CREATE TRIGGER #{trigger_name}
- BEFORE INSERT ON #{intermediate_table}
- FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}();
+CREATE TRIGGER #{quote_ident(trigger_name)}
+ BEFORE INSERT ON #{quote_ident(intermediate_table)}
+ FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}();
SQL
cast = column_cast(table, column)
queries << <<-SQL
-COMMENT ON TRIGGER #{trigger_name} ON #{intermediate_table} is 'column:#{column},period:#{period},cast:#{cast}';
+COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
SQL
end
run_queries(queries)
end
@@ -109,11 +109,11 @@
abort "Usage: pgslice unprep <table>" if arguments.length != 1
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
queries = [
- "DROP TABLE #{intermediate_table} CASCADE;",
+ "DROP TABLE #{quote_ident(intermediate_table)} CASCADE;",
"DROP FUNCTION IF EXISTS #{trigger_name}();"
]
run_queries(queries)
end
@@ -130,20 +130,20 @@
range = (-1 * past)..future
# ensure table has trigger
abort "No trigger on table: #{table}\nDid you mean to use --intermediate?" unless has_trigger?(trigger_name, table)
- index_defs = execute("select pg_get_indexdef(indexrelid) from pg_index where indrelid = $1::regclass AND indisprimary = 'f'", [original_table]).map { |r| r["pg_get_indexdef"] }
+ index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index INNER JOIN pg_stat_user_indexes USING (indexrelid) WHERE relname = $1 AND schemaname = $2 AND indisprimary = 'f'", [original_table, schema]).map { |r| r["pg_get_indexdef"] }
primary_key = self.primary_key(table)
queries = []
period, field, cast, needs_comment = settings_from_trigger(original_table, table)
abort "Could not read settings" unless period
if needs_comment
- queries << "COMMENT ON TRIGGER #{trigger_name} ON #{table} is 'column:#{field},period:#{period},cast:#{cast}';"
+ queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';"
end
# today = utc date
today = round_date(DateTime.now.new_offset(0).to_date, period)
added_partitions = []
@@ -153,16 +153,16 @@
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
next if table_exists?(partition_name)
added_partitions << partition_name
queries << <<-SQL
-CREATE TABLE #{partition_name}
- (CHECK (#{field} >= #{sql_date(day, cast)} AND #{field} < #{sql_date(advance_date(day, period, 1), cast)}))
- INHERITS (#{table});
+CREATE TABLE #{quote_ident(partition_name)}
+ (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}))
+ INHERITS (#{quote_ident(table)});
SQL
- queries << "ALTER TABLE #{partition_name} ADD PRIMARY KEY (#{primary_key});" if primary_key
+ queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key
index_defs.each do |index_def|
queries << index_def.sub(" ON #{original_table} USING ", " ON #{partition_name} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
end
end
@@ -177,12 +177,12 @@
existing_tables.each do |table|
day = DateTime.strptime(table.split("_").last, name_format)
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
- sql = "(NEW.#{field} >= #{sql_date(day, cast)} AND NEW.#{field} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
- INSERT INTO #{partition_name} VALUES (NEW.*);"
+ sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
+ INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);"
if day.to_date < today
past_defs << sql
elsif advance_date(day, period, 1) < today
current_defs << sql
@@ -194,11 +194,11 @@
# order by current period, future periods asc, past periods desc
trigger_defs = current_defs + future_defs + past_defs.reverse
if trigger_defs.any?
queries << <<-SQL
-CREATE OR REPLACE FUNCTION #{trigger_name}()
+CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}()
RETURNS trigger AS $$
BEGIN
IF #{trigger_defs.join("\n ELSIF ")}
ELSE
RAISE EXCEPTION 'Date out of range. Ensure partitions are created.';
@@ -260,33 +260,33 @@
min_source_id = min_id(source_table, primary_key, field, cast, starting_time, options[:where])
max_dest_id = min_source_id - 1 if min_source_id
end
starting_id = max_dest_id
- fields = columns(source_table).map { |c| PG::Connection.quote_ident(c) }.join(", ")
+ fields = columns(source_table).map { |c| quote_ident(c) }.join(", ")
batch_size = options[:batch_size]
i = 1
batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil
if batch_count == 0
log_sql "/* nothing to fill */"
end
while starting_id < max_source_id
- where = "#{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size}"
+ where = "#{quote_ident(primary_key)} > #{starting_id} AND #{quote_ident(primary_key)} <= #{starting_id + batch_size}"
if starting_time
- where << " AND #{field} >= #{sql_date(starting_time, cast)} AND #{field} < #{sql_date(ending_time, cast)}"
+ where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}"
end
if options[:where]
where << " AND #{options[:where]}"
end
query = <<-SQL
/* #{i} of #{batch_count} */
-INSERT INTO #{dest_table} (#{fields})
- SELECT #{fields} FROM #{source_table}
+INSERT INTO #{quote_ident(dest_table)} (#{fields})
+ SELECT #{fields} FROM #{quote_ident(source_table)}
WHERE #{where}
SQL
run_query(query)
@@ -308,16 +308,16 @@
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
abort "Table already exists: #{retired_table}" if table_exists?(retired_table)
queries = [
- "ALTER TABLE #{table} RENAME TO #{retired_table};",
- "ALTER TABLE #{intermediate_table} RENAME TO #{table};"
+ "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(retired_table)};",
+ "ALTER TABLE #{quote_ident(intermediate_table)} RENAME TO #{quote_ident(table)};"
]
self.sequences(table).each do |sequence|
- queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};"
+ queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{table}.#{sequence["related_column"]};"
end
queries.unshift("SET LOCAL lock_timeout = '#{options[:lock_timeout]}';") if server_version_num >= 90300
run_queries(queries)
@@ -332,16 +332,16 @@
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table not found: #{retired_table}" unless table_exists?(retired_table)
abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
queries = [
- "ALTER TABLE #{table} RENAME TO #{intermediate_table};",
- "ALTER TABLE #{retired_table} RENAME TO #{table};"
+ "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(intermediate_table)};",
+ "ALTER TABLE #{quote_ident(retired_table)} RENAME TO #{quote_ident(table)};"
]
self.sequences(table).each do |sequence|
- queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};"
+ queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{table}.#{sequence["related_column"]};"
end
run_queries(queries)
end
@@ -351,11 +351,11 @@
abort "Usage: pgslice analyze <table>" if arguments.length != 1
existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{6,8}\z/.match(t) }
analyze_list = existing_tables + [parent_table]
- run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{t};" }
+ run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_ident(t)};" }
end
# arguments
def parse_args(args)
@@ -484,11 +484,11 @@
pg_attribute.attname,
format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
FROM
pg_index, pg_class, pg_attribute, pg_namespace
WHERE
- pg_class.oid = $2::regclass AND
+ relname = $2 AND
indrelid = pg_class.oid AND
nspname = $1 AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey) AND
@@ -497,29 +497,29 @@
row = execute(query, [schema, table])[0]
row && row["attname"]
end
def max_id(table, primary_key, below: nil, where: nil)
- query = "SELECT MAX(#{primary_key}) FROM #{table}"
+ query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}"
conditions = []
- conditions << "#{primary_key} <= #{below}" if below
+ conditions << "#{quote_ident(primary_key)} <= #{below}" if below
conditions << where if where
query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
execute(query)[0]["max"].to_i
end
def min_id(table, primary_key, column, cast, starting_time, where)
- query = "SELECT MIN(#{primary_key}) FROM #{table}"
+ query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}"
conditions = []
- conditions << "#{column} >= #{sql_date(starting_time, cast)}" if starting_time
+ conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time
conditions << where if where
query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
(execute(query)[0]["min"] || 1).to_i
end
def has_trigger?(trigger_name, table)
- execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table]).any?
+ !fetch_trigger(trigger_name, table).nil?
end
# http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL
def sequences(table)
query = <<-SQL
@@ -593,14 +593,22 @@
else
date.next_month(count)
end
end
+ def quote_ident(value)
+ PG::Connection.quote_ident(value)
+ end
+
+ def fetch_trigger(trigger_name, table)
+ execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND EXISTS (SELECT 1 FROM pg_stat_user_tables WHERE relid = tgrelid AND relname = $2 AND schemaname = $3)", [trigger_name, table, schema])[0]
+ end
+
def settings_from_trigger(original_table, table)
trigger_name = self.trigger_name(original_table)
needs_comment = false
- comment = execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table])[0]
+ comment = fetch_trigger(trigger_name, table)
if comment
field, period, cast = comment["comment"].split(",").map { |v| v.split(":").last } rescue [nil, nil, nil]
end
unless period