lib/pgslice.rb in pgslice-0.3.4 vs lib/pgslice.rb in pgslice-0.3.5

- old
+ new

@@ -71,33 +71,33 @@ end queries = [] queries << <<-SQL -CREATE TABLE #{intermediate_table} (LIKE #{table} INCLUDING ALL); +CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL); SQL unless options[:no_partition] sql_format = SQL_FORMAT[period.to_sym] queries << <<-SQL -CREATE FUNCTION #{trigger_name}() +CREATE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN RAISE EXCEPTION 'Create partitions first.'; END; $$ LANGUAGE plpgsql; SQL queries << <<-SQL -CREATE TRIGGER #{trigger_name} - BEFORE INSERT ON #{intermediate_table} - FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}(); +CREATE TRIGGER #{quote_ident(trigger_name)} + BEFORE INSERT ON #{quote_ident(intermediate_table)} + FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}(); SQL cast = column_cast(table, column) queries << <<-SQL -COMMENT ON TRIGGER #{trigger_name} ON #{intermediate_table} is 'column:#{column},period:#{period},cast:#{cast}'; +COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; SQL end run_queries(queries) end @@ -109,11 +109,11 @@ abort "Usage: pgslice unprep <table>" if arguments.length != 1 abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) queries = [ - "DROP TABLE #{intermediate_table} CASCADE;", + "DROP TABLE #{quote_ident(intermediate_table)} CASCADE;", "DROP FUNCTION IF EXISTS #{trigger_name}();" ] run_queries(queries) end @@ -130,20 +130,20 @@ range = (-1 * past)..future # ensure table has trigger abort "No trigger on table: #{table}\nDid you mean to use --intermediate?" unless has_trigger?(trigger_name, table) - index_defs = execute("select pg_get_indexdef(indexrelid) from pg_index where indrelid = $1::regclass AND indisprimary = 'f'", [original_table]).map { |r| r["pg_get_indexdef"] } + index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index INNER JOIN pg_stat_user_indexes USING (indexrelid) WHERE relname = $1 AND schemaname = $2 AND indisprimary = 'f'", [original_table, schema]).map { |r| r["pg_get_indexdef"] } primary_key = self.primary_key(table) queries = [] period, field, cast, needs_comment = settings_from_trigger(original_table, table) abort "Could not read settings" unless period if needs_comment - queries << "COMMENT ON TRIGGER #{trigger_name} ON #{table} is 'column:#{field},period:#{period},cast:#{cast}';" + queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';" end # today = utc date today = round_date(DateTime.now.new_offset(0).to_date, period) added_partitions = [] @@ -153,16 +153,16 @@ partition_name = "#{original_table}_#{day.strftime(name_format(period))}" next if table_exists?(partition_name) added_partitions << partition_name queries << <<-SQL -CREATE TABLE #{partition_name} - (CHECK (#{field} >= #{sql_date(day, cast)} AND #{field} < #{sql_date(advance_date(day, period, 1), cast)})) - INHERITS (#{table}); +CREATE TABLE #{quote_ident(partition_name)} + (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)})) + INHERITS (#{quote_ident(table)}); SQL - queries << "ALTER TABLE #{partition_name} ADD PRIMARY KEY (#{primary_key});" if primary_key + queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key index_defs.each do |index_def| queries << index_def.sub(" ON #{original_table} USING ", " ON #{partition_name} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";" end end @@ -177,12 +177,12 @@ existing_tables.each do |table| day = DateTime.strptime(table.split("_").last, name_format) partition_name = "#{original_table}_#{day.strftime(name_format(period))}" - sql = "(NEW.#{field} >= #{sql_date(day, cast)} AND NEW.#{field} < #{sql_date(advance_date(day, period, 1), cast)}) THEN - INSERT INTO #{partition_name} VALUES (NEW.*);" + sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN + INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);" if day.to_date < today past_defs << sql elsif advance_date(day, period, 1) < today current_defs << sql @@ -194,11 +194,11 @@ # order by current period, future periods asc, past periods desc trigger_defs = current_defs + future_defs + past_defs.reverse if trigger_defs.any? queries << <<-SQL -CREATE OR REPLACE FUNCTION #{trigger_name}() +CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN IF #{trigger_defs.join("\n ELSIF ")} ELSE RAISE EXCEPTION 'Date out of range. Ensure partitions are created.'; @@ -260,33 +260,33 @@ min_source_id = min_id(source_table, primary_key, field, cast, starting_time, options[:where]) max_dest_id = min_source_id - 1 if min_source_id end starting_id = max_dest_id - fields = columns(source_table).map { |c| PG::Connection.quote_ident(c) }.join(", ") + fields = columns(source_table).map { |c| quote_ident(c) }.join(", ") batch_size = options[:batch_size] i = 1 batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil if batch_count == 0 log_sql "/* nothing to fill */" end while starting_id < max_source_id - where = "#{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size}" + where = "#{quote_ident(primary_key)} > #{starting_id} AND #{quote_ident(primary_key)} <= #{starting_id + batch_size}" if starting_time - where << " AND #{field} >= #{sql_date(starting_time, cast)} AND #{field} < #{sql_date(ending_time, cast)}" + where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end if options[:where] where << " AND #{options[:where]}" end query = <<-SQL /* #{i} of #{batch_count} */ -INSERT INTO #{dest_table} (#{fields}) - SELECT #{fields} FROM #{source_table} +INSERT INTO #{quote_ident(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_ident(source_table)} WHERE #{where} SQL run_query(query) @@ -308,16 +308,16 @@ abort "Table not found: #{table}" unless table_exists?(table) abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) abort "Table already exists: #{retired_table}" if table_exists?(retired_table) queries = [ - "ALTER TABLE #{table} RENAME TO #{retired_table};", - "ALTER TABLE #{intermediate_table} RENAME TO #{table};" + "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(retired_table)};", + "ALTER TABLE #{quote_ident(intermediate_table)} RENAME TO #{quote_ident(table)};" ] self.sequences(table).each do |sequence| - queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};" + queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{table}.#{sequence["related_column"]};" end queries.unshift("SET LOCAL lock_timeout = '#{options[:lock_timeout]}';") if server_version_num >= 90300 run_queries(queries) @@ -332,16 +332,16 @@ abort "Table not found: #{table}" unless table_exists?(table) abort "Table not found: #{retired_table}" unless table_exists?(retired_table) abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table) queries = [ - "ALTER TABLE #{table} RENAME TO #{intermediate_table};", - "ALTER TABLE #{retired_table} RENAME TO #{table};" + "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(intermediate_table)};", + "ALTER TABLE #{quote_ident(retired_table)} RENAME TO #{quote_ident(table)};" ] self.sequences(table).each do |sequence| - queries << "ALTER SEQUENCE #{sequence["sequence_name"]} OWNED BY #{table}.#{sequence["related_column"]};" + queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{table}.#{sequence["related_column"]};" end run_queries(queries) end @@ -351,11 +351,11 @@ abort "Usage: pgslice analyze <table>" if arguments.length != 1 existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{6,8}\z/.match(t) } analyze_list = existing_tables + [parent_table] - run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{t};" } + run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_ident(t)};" } end # arguments def parse_args(args) @@ -484,11 +484,11 @@ pg_attribute.attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) FROM pg_index, pg_class, pg_attribute, pg_namespace WHERE - pg_class.oid = $2::regclass AND + relname = $2 AND indrelid = pg_class.oid AND nspname = $1 AND pg_class.relnamespace = pg_namespace.oid AND pg_attribute.attrelid = pg_class.oid AND pg_attribute.attnum = any(pg_index.indkey) AND @@ -497,29 +497,29 @@ row = execute(query, [schema, table])[0] row && row["attname"] end def max_id(table, primary_key, below: nil, where: nil) - query = "SELECT MAX(#{primary_key}) FROM #{table}" + query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}" conditions = [] - conditions << "#{primary_key} <= #{below}" if below + conditions << "#{quote_ident(primary_key)} <= #{below}" if below conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? execute(query)[0]["max"].to_i end def min_id(table, primary_key, column, cast, starting_time, where) - query = "SELECT MIN(#{primary_key}) FROM #{table}" + query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}" conditions = [] - conditions << "#{column} >= #{sql_date(starting_time, cast)}" if starting_time + conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? (execute(query)[0]["min"] || 1).to_i end def has_trigger?(trigger_name, table) - execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table]).any? + !fetch_trigger(trigger_name, table).nil? end # http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL def sequences(table) query = <<-SQL @@ -593,14 +593,22 @@ else date.next_month(count) end end + def quote_ident(value) + PG::Connection.quote_ident(value) + end + + def fetch_trigger(trigger_name, table) + execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND EXISTS (SELECT 1 FROM pg_stat_user_tables WHERE relid = tgrelid AND relname = $2 AND schemaname = $3)", [trigger_name, table, schema])[0] + end + def settings_from_trigger(original_table, table) trigger_name = self.trigger_name(original_table) needs_comment = false - comment = execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table])[0] + comment = fetch_trigger(trigger_name, table) if comment field, period, cast = comment["comment"].split(",").map { |v| v.split(":").last } rescue [nil, nil, nil] end unless period