lib/pgslice.rb in pgslice-0.4.0 vs lib/pgslice.rb in pgslice-0.4.1

- old
+ new

@@ -52,11 +52,13 @@ # commands def prep table, column, period = arguments + table = qualify_table(table) intermediate_table = "#{table}_intermediate" + trigger_name = self.trigger_name(table) if options[:no_partition] abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1 abort "Can't use --trigger-based and --no-partition" if options[:trigger_based] @@ -75,25 +77,25 @@ declarative = server_version_num >= 100000 && !options[:trigger_based] if declarative && !options[:no_partition] queries << <<-SQL -CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_ident(column)}); +CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_table(column)}); SQL # add comment cast = column_cast(table, column) queries << <<-SQL -COMMENT ON TABLE #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; +COMMENT ON TABLE #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; SQL else queries << <<-SQL -CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL); +CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING ALL); SQL foreign_keys(table).each do |fk_def| - queries << "ALTER TABLE #{quote_ident(intermediate_table)} ADD #{fk_def};" + queries << "ALTER TABLE #{quote_table(intermediate_table)} ADD #{fk_def};" end end if !options[:no_partition] && !declarative sql_format = SQL_FORMAT[period.to_sym] @@ -106,40 +108,40 @@ $$ LANGUAGE plpgsql; SQL queries << <<-SQL CREATE TRIGGER #{quote_ident(trigger_name)} - BEFORE INSERT ON #{quote_ident(intermediate_table)} + BEFORE INSERT ON #{quote_table(intermediate_table)} FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}(); SQL cast = column_cast(table, column) queries << <<-SQL -COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; +COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; SQL end run_queries(queries) end def unprep - table = arguments.first + table = qualify_table(arguments.first) intermediate_table = "#{table}_intermediate" trigger_name = self.trigger_name(table) abort "Usage: pgslice unprep <table>" if arguments.length != 1 abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) queries = [ - "DROP TABLE #{quote_ident(intermediate_table)} CASCADE;", + "DROP TABLE #{quote_table(intermediate_table)} CASCADE;", "DROP FUNCTION IF EXISTS #{quote_ident(trigger_name)}();" ] run_queries(queries) end def add_partitions - original_table = arguments.first + original_table = qualify_table(arguments.first) table = options[:intermediate] ? "#{original_table}_intermediate" : original_table trigger_name = self.trigger_name(original_table) abort "Usage: pgslice add_partitions <table>" if arguments.length != 1 abort "Table not found: #{table}" unless table_exists?(table) @@ -156,11 +158,11 @@ end queries = [] if needs_comment - queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';" + queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(table)} is 'column:#{field},period:#{period},cast:#{cast}';" end # today = utc date today = round_date(DateTime.now.new_offset(0).to_date, period) @@ -168,13 +170,14 @@ if !declarative table elsif options[:intermediate] original_table else - "#{original_table}_#{today.strftime(name_format(period))}" + existing_partitions(original_table, period).last end - index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] } + + index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] } fk_defs = foreign_keys(schema_table) primary_key = self.primary_key(schema_table) added_partitions = [] range.each do |n| @@ -184,46 +187,46 @@ next if table_exists?(partition_name) added_partitions << partition_name if declarative queries << <<-SQL -CREATE TABLE #{quote_ident(partition_name)} PARTITION OF #{quote_ident(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)}); +CREATE TABLE #{quote_table(partition_name)} PARTITION OF #{quote_table(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)}); SQL else queries << <<-SQL -CREATE TABLE #{quote_ident(partition_name)} +CREATE TABLE #{quote_table(partition_name)} (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)})) - INHERITS (#{quote_ident(table)}); + INHERITS (#{quote_table(table)}); SQL end - queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key + queries << "ALTER TABLE #{quote_table(partition_name)} ADD PRIMARY KEY (#{primary_key.map { |k| quote_ident(k) }.join(", ")});" if primary_key.any? index_defs.each do |index_def| - queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_ident(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";" + queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_table(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";" end fk_defs.each do |fk_def| - queries << "ALTER TABLE #{quote_ident(partition_name)} ADD #{fk_def};" + queries << "ALTER TABLE #{quote_table(partition_name)} ADD #{fk_def};" end end unless declarative # update trigger based on existing partitions current_defs = [] future_defs = [] past_defs = [] name_format = self.name_format(period) - existing_tables = existing_partitions(original_table) + existing_tables = existing_partitions(original_table, period) existing_tables = (existing_tables + added_partitions).uniq.sort existing_tables.each do |table| day = DateTime.strptime(table.split("_").last, name_format) partition_name = "#{original_table}_#{day.strftime(name_format(period))}" sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN - INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);" + INSERT INTO #{quote_table(partition_name)} VALUES (NEW.*);" if day.to_date < today past_defs << sql elsif advance_date(day, period, 1) < today current_defs << sql @@ -253,11 +256,11 @@ run_queries(queries) if queries.any? end def fill - table = arguments.first + table = qualify_table(arguments.first) abort "Usage: pgslice fill <table>" if arguments.length != 1 source_table = options[:source_table] dest_table = options[:dest_table] @@ -276,20 +279,22 @@ period, field, cast, needs_comment, declarative = settings_from_trigger(table, dest_table) if period name_format = self.name_format(period) - existing_tables = existing_partitions(table) + existing_tables = existing_partitions(table, period) if existing_tables.any? starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format) ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1) end end schema_table = period && declarative ? existing_tables.last : table - primary_key = self.primary_key(schema_table) + + primary_key = self.primary_key(schema_table)[0] abort "No primary key" unless primary_key + max_source_id = max_id(source_table, primary_key) max_dest_id = if options[:start] options[:start] @@ -324,12 +329,12 @@ where << " AND #{options[:where]}" end query = <<-SQL /* #{i} of #{batch_count} */ -INSERT INTO #{quote_ident(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_ident(source_table)} +INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} WHERE #{where} SQL run_query(query) @@ -341,22 +346,22 @@ end end end def swap - table = arguments.first + table = qualify_table(arguments.first) intermediate_table = intermediate_name(table) retired_table = retired_name(table) abort "Usage: pgslice swap <table>" if arguments.length != 1 abort "Table not found: #{table}" unless table_exists?(table) abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) abort "Table already exists: #{retired_table}" if table_exists?(retired_table) queries = [ - "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(retired_table)};", - "ALTER TABLE #{quote_ident(intermediate_table)} RENAME TO #{quote_ident(table)};" + "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};", + "ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};" ] self.sequences(table).each do |sequence| queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};" end @@ -365,40 +370,40 @@ run_queries(queries) end def unswap - table = arguments.first + table = qualify_table(arguments.first) intermediate_table = intermediate_name(table) retired_table = retired_name(table) abort "Usage: pgslice unswap <table>" if arguments.length != 1 abort "Table not found: #{table}" unless table_exists?(table) abort "Table not found: #{retired_table}" unless table_exists?(retired_table) abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table) queries = [ - "ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(intermediate_table)};", - "ALTER TABLE #{quote_ident(retired_table)} RENAME TO #{quote_ident(table)};" + "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};", + "ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};" ] self.sequences(table).each do |sequence| queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};" end run_queries(queries) end def analyze - table = arguments.first + table = qualify_table(arguments.first) parent_table = options[:swapped] ? table : intermediate_name(table) abort "Usage: pgslice analyze <table>" if arguments.length != 1 existing_tables = existing_partitions(table) analyze_list = existing_tables + [parent_table] - run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_ident(t)};" } + run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_table(t)};" } end # arguments def parse_args(args) @@ -506,25 +511,35 @@ def server_version_num execute("SHOW server_version_num")[0]["server_version_num"].to_i end - def existing_partitions(table) - existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{6,8}\z/.match(t) } + def existing_partitions(table, period = nil) + count = + case period + when "day" + 8 + when "month" + 6 + else + "6,8" + end + + existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{#{count}}\z/.match(t) } end def existing_tables(like:) - query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2" - execute(query, [schema, like]).map { |r| r["tablename"] }.sort + query = "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2" + execute(query, like.split(".", 2)).map { |r| "#{r["schemaname"]}.#{r["tablename"]}" }.sort end def table_exists?(table) existing_tables(like: table).any? end def columns(table) - execute("SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2", [schema, table]).map{ |r| r["column_name"] } + execute("SELECT column_name FROM information_schema.columns WHERE table_schema || '.' || table_name = $1", [table]).map{ |r| r["column_name"] } end # http://stackoverflow.com/a/20537829 def primary_key(table) query = <<-SQL @@ -532,33 +547,31 @@ pg_attribute.attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) FROM pg_index, pg_class, pg_attribute, pg_namespace WHERE - relname = $2 AND + nspname || '.' || relname = $1 AND indrelid = pg_class.oid AND - nspname = $1 AND pg_class.relnamespace = pg_namespace.oid AND pg_attribute.attrelid = pg_class.oid AND pg_attribute.attnum = any(pg_index.indkey) AND indisprimary SQL - row = execute(query, [schema, table])[0] - row && row["attname"] + execute(query, [table]).map { |r| r["attname"] } end def max_id(table, primary_key, below: nil, where: nil) - query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}" + query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_table(table)}" conditions = [] conditions << "#{quote_ident(primary_key)} <= #{below}" if below conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? execute(query)[0]["max"].to_i end def min_id(table, primary_key, column, cast, starting_time, where) - query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}" + query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_table(table)}" conditions = [] conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? (execute(query)[0]["min"] || 1).to_i @@ -567,11 +580,11 @@ def has_trigger?(trigger_name, table) !fetch_trigger(trigger_name, table).nil? end def fetch_comment(table) - execute("SELECT obj_description(#{regclass(schema, table)}) AS comment")[0] + execute("SELECT obj_description(#{regclass(table)}) AS comment")[0] end # http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL def sequences(table) query = <<-SQL @@ -591,11 +604,11 @@ end # helpers def trigger_name(table) - "#{table}_insert_trigger" + "#{table.split(".")[-1]}_insert_trigger" end def intermediate_name(table) "#{table}_intermediate" end @@ -603,11 +616,11 @@ def retired_name(table) "#{table}_retired" end def column_cast(table, column) - data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3", [schema, table, column])[0]["data_type"] + data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema || '.' || table_name = $1 AND column_name = $2", [table, column])[0]["data_type"] data_type == "timestamp with time zone" ? "timestamptz" : "date" end def sql_date(time, cast, add_cast = true) if cast == "timestamptz" @@ -650,18 +663,30 @@ def quote_ident(value) PG::Connection.quote_ident(value) end - def regclass(schema, table) - "'#{quote_ident(schema)}.#{quote_ident(table)}'::regclass" + def quote_table(table) + table.split(".", 2).map { |v| quote_ident(v) }.join(".") end + def quote_no_schema(table) + quote_ident(table.split(".", 2)[-1]) + end + + def regclass(table) + "'#{quote_table(table)}'::regclass" + end + def fetch_trigger(trigger_name, table) - execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = #{regclass(schema, table)}", [trigger_name])[0] + execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = #{regclass(table)}", [trigger_name])[0] end + def qualify_table(table) + table.to_s.include?(".") ? table : [schema, table].join(".") + end + def settings_from_trigger(original_table, table) trigger_name = self.trigger_name(original_table) needs_comment = false trigger_comment = fetch_trigger(trigger_name, table) @@ -690,10 +715,10 @@ [period, field, cast, needs_comment, !trigger_comment] end def foreign_keys(table) - execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(schema, table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] } + execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] } end def server_version_num execute("SHOW server_version_num").first["server_version_num"].to_i end