lib/pgslice.rb in pgslice-0.3.6 vs lib/pgslice.rb in pgslice-0.4.0

- old
+ new

@@ -57,10 +57,11 @@ intermediate_table = "#{table}_intermediate" trigger_name = self.trigger_name(table) if options[:no_partition] abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1 + abort "Can't use --trigger-based and --no-partition" if options[:trigger_based] else abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3 end abort "Table not found: #{table}" unless table_exists?(table) abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table) @@ -70,15 +71,33 @@ abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] end queries = [] - queries << <<-SQL + declarative = server_version_num >= 100000 && !options[:trigger_based] + + if declarative && !options[:no_partition] + queries << <<-SQL +CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_ident(column)}); + SQL + + # add comment + cast = column_cast(table, column) + queries << <<-SQL +COMMENT ON TABLE #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; + SQL + else + queries << <<-SQL CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL); - SQL + SQL - unless options[:no_partition] + foreign_keys(table).each do |fk_def| + queries << "ALTER TABLE #{quote_ident(intermediate_table)} ADD #{fk_def};" + end + end + + if !options[:no_partition] && !declarative sql_format = SQL_FORMAT[period.to_sym] queries << <<-SQL CREATE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN @@ -89,16 +108,16 @@ queries << <<-SQL CREATE TRIGGER #{quote_ident(trigger_name)} BEFORE INSERT ON #{quote_ident(intermediate_table)} FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}(); - SQL + SQL cast = column_cast(table, column) queries << <<-SQL COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}'; -SQL + SQL end run_queries(queries) end @@ -127,88 +146,111 @@ future = options[:future] past = options[:past] range = (-1 * past)..future - # ensure table has trigger - abort "No trigger on table: #{table}\nDid you mean to use --intermediate?" unless has_trigger?(trigger_name, table) + period, field, cast, needs_comment, declarative = settings_from_trigger(original_table, table) + unless period + message = "No settings found: #{table}" + message = "#{message}\nDid you mean to use --intermediate?" unless options[:intermediate] + abort message + end - index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, original_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] } - primary_key = self.primary_key(table) - queries = [] - period, field, cast, needs_comment = settings_from_trigger(original_table, table) - abort "Could not read settings" unless period - if needs_comment queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';" end # today = utc date today = round_date(DateTime.now.new_offset(0).to_date, period) + + schema_table = + if !declarative + table + elsif options[:intermediate] + original_table + else + "#{original_table}_#{today.strftime(name_format(period))}" + end + index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] } + fk_defs = foreign_keys(schema_table) + primary_key = self.primary_key(schema_table) + added_partitions = [] range.each do |n| day = advance_date(today, period, n) partition_name = "#{original_table}_#{day.strftime(name_format(period))}" next if table_exists?(partition_name) added_partitions << partition_name - queries << <<-SQL + if declarative + queries << <<-SQL +CREATE TABLE #{quote_ident(partition_name)} PARTITION OF #{quote_ident(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)}); + SQL + else + queries << <<-SQL CREATE TABLE #{quote_ident(partition_name)} (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)})) INHERITS (#{quote_ident(table)}); - SQL + SQL + end queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key index_defs.each do |index_def| queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_ident(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";" end + + fk_defs.each do |fk_def| + queries << "ALTER TABLE #{quote_ident(partition_name)} ADD #{fk_def};" + end end - # update trigger based on existing partitions - current_defs = [] - future_defs = [] - past_defs = [] - name_format = self.name_format(period) - existing_tables = existing_partitions(original_table) - existing_tables = (existing_tables + added_partitions).uniq.sort + unless declarative + # update trigger based on existing partitions + current_defs = [] + future_defs = [] + past_defs = [] + name_format = self.name_format(period) + existing_tables = existing_partitions(original_table) + existing_tables = (existing_tables + added_partitions).uniq.sort - existing_tables.each do |table| - day = DateTime.strptime(table.split("_").last, name_format) - partition_name = "#{original_table}_#{day.strftime(name_format(period))}" + existing_tables.each do |table| + day = DateTime.strptime(table.split("_").last, name_format) + partition_name = "#{original_table}_#{day.strftime(name_format(period))}" - sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN - INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);" + sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN + INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);" - if day.to_date < today - past_defs << sql - elsif advance_date(day, period, 1) < today - current_defs << sql - else - future_defs << sql + if day.to_date < today + past_defs << sql + elsif advance_date(day, period, 1) < today + current_defs << sql + else + future_defs << sql + end end - end - # order by current period, future periods asc, past periods desc - trigger_defs = current_defs + future_defs + past_defs.reverse + # order by current period, future periods asc, past periods desc + trigger_defs = current_defs + future_defs + past_defs.reverse - if trigger_defs.any? - queries << <<-SQL + if trigger_defs.any? + queries << <<-SQL CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}() RETURNS trigger AS $$ BEGIN IF #{trigger_defs.join("\n ELSIF ")} ELSE RAISE EXCEPTION 'Date out of range. Ensure partitions are created.'; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; - SQL + SQL + end end run_queries(queries) if queries.any? end @@ -229,11 +271,11 @@ end abort "Table not found: #{source_table}" unless table_exists?(source_table) abort "Table not found: #{dest_table}" unless table_exists?(dest_table) - period, field, cast, needs_comment = settings_from_trigger(table, dest_table) + period, field, cast, needs_comment, declarative = settings_from_trigger(table, dest_table) if period name_format = self.name_format(period) existing_tables = existing_partitions(table) @@ -241,11 +283,12 @@ starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format) ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1) end end - primary_key = self.primary_key(table) + schema_table = period && declarative ? existing_tables.last : table + primary_key = self.primary_key(schema_table) abort "No primary key" unless primary_key max_source_id = max_id(source_table, primary_key) max_dest_id = if options[:start] @@ -366,10 +409,11 @@ o.integer "--future", default: 0 o.integer "--past", default: 0 o.integer "--batch-size", default: 10000 o.boolean "--dry-run", default: false o.boolean "--no-partition", default: false + o.boolean "--trigger-based", default: false o.integer "--start" o.string "--url" o.string "--source-table" o.string "--dest-table" o.string "--where" @@ -522,10 +566,14 @@ def has_trigger?(trigger_name, table) !fetch_trigger(trigger_name, table).nil? end + def fetch_comment(table) + execute("SELECT obj_description(#{regclass(schema, table)}) AS comment")[0] + end + # http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL def sequences(table) query = <<-SQL SELECT a.attname as related_column, @@ -559,17 +607,18 @@ def column_cast(table, column) data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3", [schema, table, column])[0]["data_type"] data_type == "timestamp with time zone" ? "timestamptz" : "date" end - def sql_date(time, cast) + def sql_date(time, cast, add_cast = true) if cast == "timestamptz" fmt = "%Y-%m-%d %H:%M:%S UTC" else fmt = "%Y-%m-%d" end - "'#{time.strftime(fmt)}'::#{cast}" + str = "'#{time.strftime(fmt)}'" + add_cast ? "#{str}::#{cast}" : str end def name_format(period) case period.to_sym when :day @@ -613,22 +662,23 @@ def settings_from_trigger(original_table, table) trigger_name = self.trigger_name(original_table) needs_comment = false - comment = fetch_trigger(trigger_name, table) + trigger_comment = fetch_trigger(trigger_name, table) + comment = trigger_comment || fetch_comment(table) if comment field, period, cast = comment["comment"].split(",").map { |v| v.split(":").last } rescue [nil, nil, nil] end unless period needs_comment = true function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0] - return [nil, nil] unless function_def + return [] unless function_def function_def = function_def["pg_get_functiondef"] sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") } - return [nil, nil] unless sql_format + return [] unless sql_format period = sql_format[0] field = /to_char\(NEW\.(\w+),/.match(function_def)[1] end # backwards compatibility with 0.2.3 and earlier (pre-timestamptz support) @@ -636,9 +686,17 @@ cast = "date" # update comment to explicitly define cast needs_comment = true end - [period, field, cast, needs_comment] + [period, field, cast, needs_comment, !trigger_comment] + end + + def foreign_keys(table) + execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(schema, table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] } + end + + def server_version_num + execute("SHOW server_version_num").first["server_version_num"].to_i end end end