lib/pgslice.rb in pgslice-0.4.2 vs lib/pgslice.rb in pgslice-0.4.3
- old
+ new
@@ -1,744 +1,10 @@
-require "pgslice/version"
-require "slop"
-require "pg"
+# dependencies
require "cgi"
+require "thor"
+require "pg"
-module PgSlice
- class Error < StandardError; end
-
- class Client
- attr_reader :arguments, :options
-
- SQL_FORMAT = {
- day: "YYYYMMDD",
- month: "YYYYMM"
- }
-
- def initialize(args)
- $stdout.sync = true
- $stderr.sync = true
- parse_args(args)
- @command = @arguments.shift
- end
-
- def perform
- return if @exit
-
- case @command
- when "prep"
- prep
- when "add_partitions"
- add_partitions
- when "fill"
- fill
- when "swap"
- swap
- when "unswap"
- unswap
- when "unprep"
- unprep
- when "analyze"
- analyze
- when nil
- log "Commands: add_partitions, analyze, fill, prep, swap, unprep, unswap"
- else
- abort "Unknown command: #{@command}"
- end
- ensure
- @connection.close if @connection
- end
-
- protected
-
- # commands
-
- def prep
- table, column, period = arguments
- table = qualify_table(table)
- intermediate_table = "#{table}_intermediate"
-
- trigger_name = self.trigger_name(table)
-
- if options[:no_partition]
- abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1
- abort "Can't use --trigger-based and --no-partition" if options[:trigger_based]
- else
- abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3
- end
- abort "Table not found: #{table}" unless table_exists?(table)
- abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
-
- unless options[:no_partition]
- abort "Column not found: #{column}" unless columns(table).include?(column)
- abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym]
- end
-
- queries = []
-
- declarative = server_version_num >= 100000 && !options[:trigger_based]
-
- if declarative && !options[:no_partition]
- queries << <<-SQL
-CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_table(column)});
- SQL
-
- if server_version_num >= 110000
- index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
- index_defs.each do |index_def|
- queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_table(intermediate_table)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
- end
- end
-
- # add comment
- cast = column_cast(table, column)
- queries << <<-SQL
-COMMENT ON TABLE #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
- SQL
- else
- queries << <<-SQL
-CREATE TABLE #{quote_table(intermediate_table)} (LIKE #{quote_table(table)} INCLUDING ALL);
- SQL
-
- foreign_keys(table).each do |fk_def|
- queries << "ALTER TABLE #{quote_table(intermediate_table)} ADD #{fk_def};"
- end
- end
-
- if !options[:no_partition] && !declarative
- sql_format = SQL_FORMAT[period.to_sym]
- queries << <<-SQL
-CREATE FUNCTION #{quote_ident(trigger_name)}()
- RETURNS trigger AS $$
- BEGIN
- RAISE EXCEPTION 'Create partitions first.';
- END;
- $$ LANGUAGE plpgsql;
- SQL
-
- queries << <<-SQL
-CREATE TRIGGER #{quote_ident(trigger_name)}
- BEFORE INSERT ON #{quote_table(intermediate_table)}
- FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}();
- SQL
-
- cast = column_cast(table, column)
- queries << <<-SQL
-COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
- SQL
- end
-
- run_queries(queries)
- end
-
- def unprep
- table = qualify_table(arguments.first)
- intermediate_table = "#{table}_intermediate"
- trigger_name = self.trigger_name(table)
-
- abort "Usage: pgslice unprep <table>" if arguments.length != 1
- abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
-
- queries = [
- "DROP TABLE #{quote_table(intermediate_table)} CASCADE;",
- "DROP FUNCTION IF EXISTS #{quote_ident(trigger_name)}();"
- ]
- run_queries(queries)
- end
-
- def add_partitions
- original_table = qualify_table(arguments.first)
- table = options[:intermediate] ? "#{original_table}_intermediate" : original_table
- trigger_name = self.trigger_name(original_table)
-
- abort "Usage: pgslice add_partitions <table>" if arguments.length != 1
- abort "Table not found: #{table}" unless table_exists?(table)
-
- future = options[:future]
- past = options[:past]
- range = (-1 * past)..future
-
- period, field, cast, needs_comment, declarative = settings_from_trigger(original_table, table)
- unless period
- message = "No settings found: #{table}"
- message = "#{message}\nDid you mean to use --intermediate?" unless options[:intermediate]
- abort message
- end
-
- queries = []
-
- if needs_comment
- queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_table(table)} is 'column:#{field},period:#{period},cast:#{cast}';"
- end
-
- # today = utc date
- today = round_date(DateTime.now.new_offset(0).to_date, period)
-
- schema_table =
- if !declarative
- table
- elsif options[:intermediate]
- original_table
- else
- existing_partitions(original_table, period).last
- end
-
- # indexes automatically propagate in Postgres 11+
- index_defs =
- if !declarative || server_version_num < 110000
- execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
- else
- []
- end
-
- fk_defs = foreign_keys(schema_table)
- primary_key = self.primary_key(schema_table)
-
- added_partitions = []
- range.each do |n|
- day = advance_date(today, period, n)
-
- partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
- next if table_exists?(partition_name)
- added_partitions << partition_name
-
- if declarative
- queries << <<-SQL
-CREATE TABLE #{quote_table(partition_name)} PARTITION OF #{quote_table(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)});
- SQL
- else
- queries << <<-SQL
-CREATE TABLE #{quote_table(partition_name)}
- (CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}))
- INHERITS (#{quote_table(table)});
- SQL
- end
-
- queries << "ALTER TABLE #{quote_table(partition_name)} ADD PRIMARY KEY (#{primary_key.map { |k| quote_ident(k) }.join(", ")});" if primary_key.any?
-
- index_defs.each do |index_def|
- queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_table(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
- end
-
- fk_defs.each do |fk_def|
- queries << "ALTER TABLE #{quote_table(partition_name)} ADD #{fk_def};"
- end
- end
-
- unless declarative
- # update trigger based on existing partitions
- current_defs = []
- future_defs = []
- past_defs = []
- name_format = self.name_format(period)
- existing_tables = existing_partitions(original_table, period)
- existing_tables = (existing_tables + added_partitions).uniq.sort
-
- existing_tables.each do |table|
- day = DateTime.strptime(table.split("_").last, name_format)
- partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
-
- sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
- INSERT INTO #{quote_table(partition_name)} VALUES (NEW.*);"
-
- if day.to_date < today
- past_defs << sql
- elsif advance_date(day, period, 1) < today
- current_defs << sql
- else
- future_defs << sql
- end
- end
-
- # order by current period, future periods asc, past periods desc
- trigger_defs = current_defs + future_defs + past_defs.reverse
-
- if trigger_defs.any?
- queries << <<-SQL
-CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}()
- RETURNS trigger AS $$
- BEGIN
- IF #{trigger_defs.join("\n ELSIF ")}
- ELSE
- RAISE EXCEPTION 'Date out of range. Ensure partitions are created.';
- END IF;
- RETURN NULL;
- END;
- $$ LANGUAGE plpgsql;
- SQL
- end
- end
-
- run_queries(queries) if queries.any?
- end
-
- def fill
- table = qualify_table(arguments.first)
-
- abort "Usage: pgslice fill <table>" if arguments.length != 1
-
- source_table = options[:source_table]
- dest_table = options[:dest_table]
-
- if options[:swapped]
- source_table ||= retired_name(table)
- dest_table ||= table
- else
- source_table ||= table
- dest_table ||= intermediate_name(table)
- end
-
- abort "Table not found: #{source_table}" unless table_exists?(source_table)
- abort "Table not found: #{dest_table}" unless table_exists?(dest_table)
-
- period, field, cast, needs_comment, declarative = settings_from_trigger(table, dest_table)
-
- if period
- name_format = self.name_format(period)
-
- existing_tables = existing_partitions(table, period)
- if existing_tables.any?
- starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format)
- ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1)
- end
- end
-
- schema_table = period && declarative ? existing_tables.last : table
-
- primary_key = self.primary_key(schema_table)[0]
- abort "No primary key" unless primary_key
-
- max_source_id = nil
- begin
- max_source_id = max_id(source_table, primary_key)
- rescue PG::UndefinedFunction
- abort "Only numeric primary keys are supported"
- end
-
- max_dest_id =
- if options[:start]
- options[:start]
- elsif options[:swapped]
- max_id(dest_table, primary_key, where: options[:where], below: max_source_id)
- else
- max_id(dest_table, primary_key, where: options[:where])
- end
-
- if max_dest_id == 0 && !options[:swapped]
- min_source_id = min_id(source_table, primary_key, field, cast, starting_time, options[:where])
- max_dest_id = min_source_id - 1 if min_source_id
- end
-
- starting_id = max_dest_id
- fields = columns(source_table).map { |c| quote_ident(c) }.join(", ")
- batch_size = options[:batch_size]
-
- i = 1
- batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil
-
- if batch_count == 0
- log_sql "/* nothing to fill */"
- end
-
- while starting_id < max_source_id
- where = "#{quote_ident(primary_key)} > #{starting_id} AND #{quote_ident(primary_key)} <= #{starting_id + batch_size}"
- if starting_time
- where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}"
- end
- if options[:where]
- where << " AND #{options[:where]}"
- end
-
- query = <<-SQL
-/* #{i} of #{batch_count} */
-INSERT INTO #{quote_table(dest_table)} (#{fields})
- SELECT #{fields} FROM #{quote_table(source_table)}
- WHERE #{where}
- SQL
-
- run_query(query)
-
- starting_id += batch_size
- i += 1
-
- if options[:sleep] && starting_id <= max_source_id
- sleep(options[:sleep])
- end
- end
- end
-
- def swap
- table = qualify_table(arguments.first)
- intermediate_table = intermediate_name(table)
- retired_table = retired_name(table)
-
- abort "Usage: pgslice swap <table>" if arguments.length != 1
- abort "Table not found: #{table}" unless table_exists?(table)
- abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
- abort "Table already exists: #{retired_table}" if table_exists?(retired_table)
-
- queries = [
- "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};",
- "ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};"
- ]
-
- self.sequences(table).each do |sequence|
- queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};"
- end
-
- queries.unshift("SET LOCAL lock_timeout = '#{options[:lock_timeout]}';") if server_version_num >= 90300
-
- run_queries(queries)
- end
-
- def unswap
- table = qualify_table(arguments.first)
- intermediate_table = intermediate_name(table)
- retired_table = retired_name(table)
-
- abort "Usage: pgslice unswap <table>" if arguments.length != 1
- abort "Table not found: #{table}" unless table_exists?(table)
- abort "Table not found: #{retired_table}" unless table_exists?(retired_table)
- abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
-
- queries = [
- "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};",
- "ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};"
- ]
-
- self.sequences(table).each do |sequence|
- queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};"
- end
-
- run_queries(queries)
- end
-
- def analyze
- table = qualify_table(arguments.first)
- parent_table = options[:swapped] ? table : intermediate_name(table)
-
- abort "Usage: pgslice analyze <table>" if arguments.length != 1
-
- existing_tables = existing_partitions(table)
- analyze_list = existing_tables + [parent_table]
- run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_table(t)};" }
- end
-
- # arguments
-
- def parse_args(args)
- opts = Slop.parse(args) do |o|
- o.boolean "--intermediate"
- o.boolean "--swapped"
- o.float "--sleep"
- o.integer "--future", default: 0
- o.integer "--past", default: 0
- o.integer "--batch-size", default: 10000
- o.boolean "--dry-run", default: false
- o.boolean "--no-partition", default: false
- o.boolean "--trigger-based", default: false
- o.integer "--start"
- o.string "--url"
- o.string "--source-table"
- o.string "--dest-table"
- o.string "--where"
- o.string "--lock-timeout", default: "5s"
- o.on "-v", "--version", "print the version" do
- log PgSlice::VERSION
- @exit = true
- end
- end
- @arguments = opts.arguments
- @options = opts.to_hash
- rescue Slop::Error => e
- abort e.message
- end
-
- # output
-
- def log(message = nil)
- $stderr.puts message
- end
-
- def log_sql(message = nil)
- $stdout.puts message
- end
-
- def abort(message)
- raise PgSlice::Error, message
- end
-
- # database connection
-
- def connection
- @connection ||= begin
- url = options[:url] || ENV["PGSLICE_URL"]
- abort "Set PGSLICE_URL or use the --url option" unless url
-
- uri = URI.parse(url)
- params = CGI.parse(uri.query.to_s)
- # remove schema
- @schema = Array(params.delete("schema") || "public")[0]
- uri.query = URI.encode_www_form(params)
-
- ENV["PGCONNECT_TIMEOUT"] ||= "1"
- PG::Connection.new(uri.to_s)
- end
- rescue PG::ConnectionBad => e
- abort e.message
- rescue URI::InvalidURIError
- abort "Invalid url"
- end
-
- def schema
- connection # ensure called first
- @schema
- end
-
- def execute(query, params = [])
- connection.exec_params(query, params).to_a
- end
-
- def run_queries(queries)
- connection.transaction do
- execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run]
- log_sql "BEGIN;"
- log_sql
- run_queries_without_transaction(queries)
- log_sql "COMMIT;"
- end
- end
-
- def run_query(query)
- log_sql query
- unless options[:dry_run]
- begin
- execute(query)
- rescue PG::ServerError => e
- abort("#{e.class.name}: #{e.message}")
- end
- end
- log_sql
- end
-
- def run_queries_without_transaction(queries)
- queries.each do |query|
- run_query(query)
- end
- end
-
- def server_version_num
- execute("SHOW server_version_num")[0]["server_version_num"].to_i
- end
-
- def existing_partitions(table, period = nil)
- count =
- case period
- when "day"
- 8
- when "month"
- 6
- else
- "6,8"
- end
-
- existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{#{count}}\z/.match(t) }
- end
-
- def existing_tables(like:)
- query = "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2"
- execute(query, like.split(".", 2)).map { |r| "#{r["schemaname"]}.#{r["tablename"]}" }.sort
- end
-
- def table_exists?(table)
- existing_tables(like: table).any?
- end
-
- def columns(table)
- execute("SELECT column_name FROM information_schema.columns WHERE table_schema || '.' || table_name = $1", [table]).map{ |r| r["column_name"] }
- end
-
- # http://stackoverflow.com/a/20537829
- def primary_key(table)
- query = <<-SQL
- SELECT
- pg_attribute.attname,
- format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
- FROM
- pg_index, pg_class, pg_attribute, pg_namespace
- WHERE
- nspname || '.' || relname = $1 AND
- indrelid = pg_class.oid AND
- pg_class.relnamespace = pg_namespace.oid AND
- pg_attribute.attrelid = pg_class.oid AND
- pg_attribute.attnum = any(pg_index.indkey) AND
- indisprimary
- SQL
- execute(query, [table]).map { |r| r["attname"] }
- end
-
- def max_id(table, primary_key, below: nil, where: nil)
- query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_table(table)}"
- conditions = []
- conditions << "#{quote_ident(primary_key)} <= #{below}" if below
- conditions << where if where
- query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
- execute(query)[0]["max"].to_i
- end
-
- def min_id(table, primary_key, column, cast, starting_time, where)
- query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_table(table)}"
- conditions = []
- conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time
- conditions << where if where
- query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
- (execute(query)[0]["min"] || 1).to_i
- end
-
- def has_trigger?(trigger_name, table)
- !fetch_trigger(trigger_name, table).nil?
- end
-
- def fetch_comment(table)
- execute("SELECT obj_description(#{regclass(table)}) AS comment")[0]
- end
-
- # http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL
- def sequences(table)
- query = <<-SQL
- SELECT
- a.attname as related_column,
- s.relname as sequence_name
- FROM pg_class s
- JOIN pg_depend d ON d.objid = s.oid
- JOIN pg_class t ON d.objid = s.oid AND d.refobjid = t.oid
- JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
- JOIN pg_namespace n ON n.oid = s.relnamespace
- WHERE s.relkind = 'S'
- AND n.nspname = $1
- AND t.relname = $2
- SQL
- execute(query, [schema, table])
- end
-
- # helpers
-
- def trigger_name(table)
- "#{table.split(".")[-1]}_insert_trigger"
- end
-
- def intermediate_name(table)
- "#{table}_intermediate"
- end
-
- def retired_name(table)
- "#{table}_retired"
- end
-
- def column_cast(table, column)
- data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema || '.' || table_name = $1 AND column_name = $2", [table, column])[0]["data_type"]
- data_type == "timestamp with time zone" ? "timestamptz" : "date"
- end
-
- def sql_date(time, cast, add_cast = true)
- if cast == "timestamptz"
- fmt = "%Y-%m-%d %H:%M:%S UTC"
- else
- fmt = "%Y-%m-%d"
- end
- str = "'#{time.strftime(fmt)}'"
- add_cast ? "#{str}::#{cast}" : str
- end
-
- def name_format(period)
- case period.to_sym
- when :day
- "%Y%m%d"
- else
- "%Y%m"
- end
- end
-
- def round_date(date, period)
- date = date.to_date
- case period.to_sym
- when :day
- date
- else
- Date.new(date.year, date.month)
- end
- end
-
- def advance_date(date, period, count = 1)
- date = date.to_date
- case period.to_sym
- when :day
- date.next_day(count)
- else
- date.next_month(count)
- end
- end
-
- def quote_ident(value)
- PG::Connection.quote_ident(value)
- end
-
- def quote_table(table)
- table.split(".", 2).map { |v| quote_ident(v) }.join(".")
- end
-
- def quote_no_schema(table)
- quote_ident(table.split(".", 2)[-1])
- end
-
- def regclass(table)
- "'#{quote_table(table)}'::regclass"
- end
-
- def fetch_trigger(trigger_name, table)
- execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = #{regclass(table)}", [trigger_name])[0]
- end
-
- def qualify_table(table)
- table.to_s.include?(".") ? table : [schema, table].join(".")
- end
-
- def settings_from_trigger(original_table, table)
- trigger_name = self.trigger_name(original_table)
-
- needs_comment = false
- trigger_comment = fetch_trigger(trigger_name, table)
- comment = trigger_comment || fetch_comment(table)
- if comment
- field, period, cast = comment["comment"].split(",").map { |v| v.split(":").last } rescue [nil, nil, nil]
- end
-
- unless period
- needs_comment = true
- function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]
- return [] unless function_def
- function_def = function_def["pg_get_functiondef"]
- sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") }
- return [] unless sql_format
- period = sql_format[0]
- field = /to_char\(NEW\.(\w+),/.match(function_def)[1]
- end
-
- # backwards compatibility with 0.2.3 and earlier (pre-timestamptz support)
- unless cast
- cast = "date"
- # update comment to explicitly define cast
- needs_comment = true
- end
-
- [period, field, cast, needs_comment, !trigger_comment]
- end
-
- def foreign_keys(table)
- execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] }
- end
-
- def server_version_num
- execute("SHOW server_version_num").first["server_version_num"].to_i
- end
- end
-end
+# modules
+require "pgslice/client"
+require "pgslice/generic_table"
+require "pgslice/table"
+require "pgslice/version"