lib/pgslice.rb in pgslice-0.3.6 vs lib/pgslice.rb in pgslice-0.4.0
- old
+ new
@@ -57,10 +57,11 @@
intermediate_table = "#{table}_intermediate"
trigger_name = self.trigger_name(table)
if options[:no_partition]
abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1
+ abort "Can't use --trigger-based and --no-partition" if options[:trigger_based]
else
abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3
end
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
@@ -70,15 +71,33 @@
abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym]
end
queries = []
- queries << <<-SQL
+ declarative = server_version_num >= 100000 && !options[:trigger_based]
+
+ if declarative && !options[:no_partition]
+ queries << <<-SQL
+CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_ident(column)});
+ SQL
+
+ # add comment
+ cast = column_cast(table, column)
+ queries << <<-SQL
+COMMENT ON TABLE #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
+ SQL
+ else
+ queries << <<-SQL
CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL);
- SQL
+ SQL
- unless options[:no_partition]
+ foreign_keys(table).each do |fk_def|
+ queries << "ALTER TABLE #{quote_ident(intermediate_table)} ADD #{fk_def};"
+ end
+ end
+
+ if !options[:no_partition] && !declarative
sql_format = SQL_FORMAT[period.to_sym]
queries << <<-SQL
CREATE FUNCTION #{quote_ident(trigger_name)}()
RETURNS trigger AS $$
BEGIN
@@ -89,16 +108,16 @@
queries << <<-SQL
CREATE TRIGGER #{quote_ident(trigger_name)}
BEFORE INSERT ON #{quote_ident(intermediate_table)}
FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}();
- SQL
+ SQL
cast = column_cast(table, column)
queries << <<-SQL
COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
-SQL
+ SQL
end
run_queries(queries)
end
@@ -127,88 +146,111 @@
future = options[:future]
past = options[:past]
range = (-1 * past)..future
- # ensure table has trigger
- abort "No trigger on table: #{table}\nDid you mean to use --intermediate?" unless has_trigger?(trigger_name, table)
+ period, field, cast, needs_comment, declarative = settings_from_trigger(original_table, table)
+ unless period
+ message = "No settings found: #{table}"
+ message = "#{message}\nDid you mean to use --intermediate?" unless options[:intermediate]
+ abort message
+ end
- index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, original_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
- primary_key = self.primary_key(table)
-
queries = []
- period, field, cast, needs_comment = settings_from_trigger(original_table, table)
- abort "Could not read settings" unless period
-
if needs_comment
queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';"
end
# today = utc date
today = round_date(DateTime.now.new_offset(0).to_date, period)
+
+ schema_table =
+ if !declarative
+ table
+ elsif options[:intermediate]
+ original_table
+ else
+ "#{original_table}_#{today.strftime(name_format(period))}"
+ end
+ index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
+ fk_defs = foreign_keys(schema_table)
+ primary_key = self.primary_key(schema_table)
+
added_partitions = []
range.each do |n|
day = advance_date(today, period, n)
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
next if table_exists?(partition_name)
added_partitions << partition_name
- queries << <<-SQL
+ if declarative
+ queries << <<-SQL
+CREATE TABLE #{quote_ident(partition_name)} PARTITION OF #{quote_ident(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)});
+ SQL
+ else
+ queries << <<-SQL
CREATE TABLE #{quote_ident(partition_name)}
(CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}))
INHERITS (#{quote_ident(table)});
- SQL
+ SQL
+ end
queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key
index_defs.each do |index_def|
queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_ident(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
end
+
+ fk_defs.each do |fk_def|
+ queries << "ALTER TABLE #{quote_ident(partition_name)} ADD #{fk_def};"
+ end
end
- # update trigger based on existing partitions
- current_defs = []
- future_defs = []
- past_defs = []
- name_format = self.name_format(period)
- existing_tables = existing_partitions(original_table)
- existing_tables = (existing_tables + added_partitions).uniq.sort
+ unless declarative
+ # update trigger based on existing partitions
+ current_defs = []
+ future_defs = []
+ past_defs = []
+ name_format = self.name_format(period)
+ existing_tables = existing_partitions(original_table)
+ existing_tables = (existing_tables + added_partitions).uniq.sort
- existing_tables.each do |table|
- day = DateTime.strptime(table.split("_").last, name_format)
- partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
+ existing_tables.each do |table|
+ day = DateTime.strptime(table.split("_").last, name_format)
+ partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
- sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
- INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);"
+ sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
+ INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);"
- if day.to_date < today
- past_defs << sql
- elsif advance_date(day, period, 1) < today
- current_defs << sql
- else
- future_defs << sql
+ if day.to_date < today
+ past_defs << sql
+ elsif advance_date(day, period, 1) < today
+ current_defs << sql
+ else
+ future_defs << sql
+ end
end
- end
- # order by current period, future periods asc, past periods desc
- trigger_defs = current_defs + future_defs + past_defs.reverse
+ # order by current period, future periods asc, past periods desc
+ trigger_defs = current_defs + future_defs + past_defs.reverse
- if trigger_defs.any?
- queries << <<-SQL
+ if trigger_defs.any?
+ queries << <<-SQL
CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}()
RETURNS trigger AS $$
BEGIN
IF #{trigger_defs.join("\n ELSIF ")}
ELSE
RAISE EXCEPTION 'Date out of range. Ensure partitions are created.';
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
- SQL
+ SQL
+ end
end
run_queries(queries) if queries.any?
end
@@ -229,11 +271,11 @@
end
abort "Table not found: #{source_table}" unless table_exists?(source_table)
abort "Table not found: #{dest_table}" unless table_exists?(dest_table)
- period, field, cast, needs_comment = settings_from_trigger(table, dest_table)
+ period, field, cast, needs_comment, declarative = settings_from_trigger(table, dest_table)
if period
name_format = self.name_format(period)
existing_tables = existing_partitions(table)
@@ -241,11 +283,12 @@
starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format)
ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1)
end
end
- primary_key = self.primary_key(table)
+ schema_table = period && declarative ? existing_tables.last : table
+ primary_key = self.primary_key(schema_table)
abort "No primary key" unless primary_key
max_source_id = max_id(source_table, primary_key)
max_dest_id =
if options[:start]
@@ -366,10 +409,11 @@
o.integer "--future", default: 0
o.integer "--past", default: 0
o.integer "--batch-size", default: 10000
o.boolean "--dry-run", default: false
o.boolean "--no-partition", default: false
+ o.boolean "--trigger-based", default: false
o.integer "--start"
o.string "--url"
o.string "--source-table"
o.string "--dest-table"
o.string "--where"
@@ -522,10 +566,14 @@
def has_trigger?(trigger_name, table)
!fetch_trigger(trigger_name, table).nil?
end
+ def fetch_comment(table)
+ execute("SELECT obj_description(#{regclass(schema, table)}) AS comment")[0]
+ end
+
# http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL
def sequences(table)
query = <<-SQL
SELECT
a.attname as related_column,
@@ -559,17 +607,18 @@
def column_cast(table, column)
data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3", [schema, table, column])[0]["data_type"]
data_type == "timestamp with time zone" ? "timestamptz" : "date"
end
- def sql_date(time, cast)
+ def sql_date(time, cast, add_cast = true)
if cast == "timestamptz"
fmt = "%Y-%m-%d %H:%M:%S UTC"
else
fmt = "%Y-%m-%d"
end
- "'#{time.strftime(fmt)}'::#{cast}"
+ str = "'#{time.strftime(fmt)}'"
+ add_cast ? "#{str}::#{cast}" : str
end
def name_format(period)
case period.to_sym
when :day
@@ -613,22 +662,23 @@
def settings_from_trigger(original_table, table)
trigger_name = self.trigger_name(original_table)
needs_comment = false
- comment = fetch_trigger(trigger_name, table)
+ trigger_comment = fetch_trigger(trigger_name, table)
+ comment = trigger_comment || fetch_comment(table)
if comment
field, period, cast = comment["comment"].split(",").map { |v| v.split(":").last } rescue [nil, nil, nil]
end
unless period
needs_comment = true
function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]
- return [nil, nil] unless function_def
+ return [] unless function_def
function_def = function_def["pg_get_functiondef"]
sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") }
- return [nil, nil] unless sql_format
+ return [] unless sql_format
period = sql_format[0]
field = /to_char\(NEW\.(\w+),/.match(function_def)[1]
end
# backwards compatibility with 0.2.3 and earlier (pre-timestamptz support)
@@ -636,9 +686,17 @@
cast = "date"
# update comment to explicitly define cast
needs_comment = true
end
- [period, field, cast, needs_comment]
+ [period, field, cast, needs_comment, !trigger_comment]
+ end
+
+ def foreign_keys(table)
+ execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(schema, table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] }
+ end
+
+ def server_version_num
+ execute("SHOW server_version_num").first["server_version_num"].to_i
end
end
end