require "pgslice/version" require "slop" require "pg" require "active_support/all" module PgSlice class Error < StandardError; end class Client attr_reader :arguments, :options SQL_FORMAT = { day: "YYYYMMDD", month: "YYYYMM" } def initialize(args) parse_args(args) @command = @arguments.shift end def perform return if @exit case @command when "prep" prep when "add_partitions" add_partitions when "fill" fill when "swap" swap when "unswap" unswap when "unprep" unprep when nil log "Commands: add_partitions, fill, prep, swap, unprep, unswap" else abort "Unknown command: #{@command}" end ensure @connection.close if @connection end protected # commands def prep table, column, period = arguments intermediate_table = "#{table}_intermediate" trigger_name = self.trigger_name(table) abort "Usage: pgslice prep " if arguments.length != 3 abort "Table not found: #{table}" unless table_exists?(table) abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table) abort "Column not found: #{column}" unless columns(table).include?(column) abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] log "Creating #{intermediate_table} from #{table}" queries = [] queries << <<-SQL CREATE TABLE #{intermediate_table} ( LIKE #{table} INCLUDING INDEXES INCLUDING DEFAULTS ) SQL sql_format = SQL_FORMAT[period.to_sym] queries << <<-SQL CREATE FUNCTION #{trigger_name}() RETURNS trigger AS $$ BEGIN EXECUTE 'INSERT INTO public.#{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW; RETURN NULL; END; $$ LANGUAGE plpgsql SQL queries << <<-SQL CREATE TRIGGER #{trigger_name} BEFORE INSERT ON #{intermediate_table} FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}() SQL run_queries(queries) end def unprep table = arguments.first intermediate_table = "#{table}_intermediate" trigger_name = self.trigger_name(table) abort "Usage: pgslice unprep
" if arguments.length != 1 abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) log "Dropping #{intermediate_table}" queries = [ "DROP TABLE #{intermediate_table} CASCADE", "DROP FUNCTION #{trigger_name}()" ] run_queries(queries) end def add_partitions original_table = arguments.first table = options[:intermediate] ? "#{original_table}_intermediate" : original_table trigger_name = self.trigger_name(original_table) abort "Usage: pgslice add_partitions
" if arguments.length != 1 abort "Table not found: #{table}" unless table_exists?(table) future = options[:future] past = options[:past] range = (-1 * past)..future # ensure table has trigger abort "No trigger on table: #{table}\nDid you mean to use --intermediate?" unless has_trigger?(trigger_name, table) period, field, name_format, inc, today = settings_from_table(original_table, table) days = range.map { |n| today + (n * inc) } queries = [] days.each do |day| partition_name = "#{original_table}_#{day.strftime(name_format)}" next if table_exists?(partition_name) log "Creating #{partition_name} from #{table}" date_format = "%Y-%m-%d" queries << <<-SQL CREATE TABLE #{partition_name} ( LIKE #{table} INCLUDING INDEXES INCLUDING DEFAULTS, CHECK (#{field} >= '#{day.strftime(date_format)}'::date AND #{field} < '#{(day + inc).strftime(date_format)}'::date) ) INHERITS (#{table}) SQL end run_queries(queries) if queries.any? end def fill table = arguments.first abort "Usage: pgslice fill
" if arguments.length != 1 if options[:swapped] source_table = retired_name(table) dest_table = table else source_table = table dest_table = intermediate_name(table) end abort "Table not found: #{source_table}" unless table_exists?(source_table) abort "Table not found: #{dest_table}" unless table_exists?(dest_table) period, field, name_format, inc, today = settings_from_table(table, dest_table) date_format = "%Y-%m-%d" existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) } starting_time = DateTime.strptime(existing_tables.first.last(8), name_format) ending_time = DateTime.strptime(existing_tables.last.last(8), name_format) + inc primary_key = self.primary_key(table) max_source_id = max_id(source_table, primary_key) max_dest_id = max_id(dest_table, primary_key) starting_id = max_dest_id + 1 fields = columns(source_table).join(", ") batch_size = options[:batch_size] if starting_id < max_source_id log "#{primary_key}: #{starting_id} -> #{max_source_id}" log "time: #{starting_time.to_date} -> #{ending_time.to_date}" while starting_id < max_source_id log "#{starting_id}..#{[starting_id + batch_size - 1, max_source_id].min}" query = "INSERT INTO #{dest_table} (#{fields}) SELECT #{fields} FROM #{source_table} WHERE #{primary_key} >= #{starting_id} AND #{primary_key} < #{starting_id + batch_size} AND #{field} >= '#{starting_time.strftime(date_format)}'::date AND #{field} < '#{ending_time.strftime(date_format)}'::date" log query if options[:debug] execute(query) starting_id += batch_size if options[:sleep] && starting_id < max_source_id sleep(options[:sleep]) end end end end def swap table = arguments.first intermediate_table = intermediate_name(table) retired_table = retired_name(table) abort "Usage: pgslice swap
" if arguments.length != 1 abort "Table not found: #{table}" unless table_exists?(table) abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) abort "Table already exists: #{retired_table}" if table_exists?(retired_table) log "Renaming #{table} to #{retired_table}" log "Renaming #{intermediate_table} to #{table}" queries = [ "ALTER TABLE #{table} RENAME TO #{retired_table}", "ALTER TABLE #{intermediate_table} RENAME TO #{table}" ] run_queries(queries) end def unswap table = arguments.first intermediate_table = intermediate_name(table) retired_table = retired_name(table) abort "Usage: pgslice unswap
" if arguments.length != 1 abort "Table not found: #{table}" unless table_exists?(table) abort "Table not found: #{retired_table}" unless table_exists?(retired_table) abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table) log "Renaming #{table} to #{intermediate_table}" log "Renaming #{retired_table} to #{table}" queries = [ "ALTER TABLE #{table} RENAME TO #{intermediate_table}", "ALTER TABLE #{retired_table} RENAME TO #{table}" ] run_queries(queries) end # arguments def parse_args(args) opts = Slop.parse(args) do |o| o.boolean "--intermediate" o.boolean "--swapped" o.boolean "--debug" o.float "--sleep" o.integer "--future", default: 3 o.integer "--past", default: 3 o.integer "--batch-size", default: 10000 o.on "-v", "--version", "print the version" do log PgSlice::VERSION @exit = true end end @arguments = opts.arguments @options = opts.to_hash rescue Slop::Error => e abort e.message end # output def log(message) $stderr.puts message end def abort(message) raise PgSlice::Error, message end # database connection def connection @connection ||= begin abort "Set PGSLICE_URL" unless ENV["PGSLICE_URL"] uri = URI.parse(ENV["PGSLICE_URL"]) uri_parser = URI::Parser.new config = { host: uri.host, port: uri.port, dbname: uri.path.sub(/\A\//, ""), user: uri.user, password: uri.password, connect_timeout: 1 }.reject { |_, value| value.to_s.empty? } config.map { |key, value| config[key] = uri_parser.unescape(value) if value.is_a?(String) } PG::Connection.new(config) end end def execute(query, params = []) connection.exec_params(query, params).to_a end def run_queries(queries) connection.transaction do execute("SET client_min_messages TO warning") queries.map(&:squish).each do |query| execute(query) end end end def existing_tables(like:) query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2" execute(query, ["public", like]).map { |r| r["tablename"] }.sort end def table_exists?(table) existing_tables(like: table).any? end def columns(table) execute("SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1", [table]).map{ |r| r["column_name"] } end # http://stackoverflow.com/a/20537829 def primary_key(table) query = <<-SQL SELECT pg_attribute.attname, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) FROM pg_index, pg_class, pg_attribute, pg_namespace WHERE pg_class.oid = $2::regclass AND indrelid = pg_class.oid AND nspname = $1 AND pg_class.relnamespace = pg_namespace.oid AND pg_attribute.attrelid = pg_class.oid AND pg_attribute.attnum = any(pg_index.indkey) AND indisprimary SQL row = execute(query, ["public", table])[0] row && row["attname"] end def max_id(table, primary_key) execute("SELECT MAX(#{primary_key}) FROM #{table}")[0]["max"].to_i end def has_trigger?(trigger_name, table) execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, "public.#{table}"]).any? end # helpers def trigger_name(table) "#{table}_insert_trigger" end def intermediate_name(table) "#{table}_intermediate" end def retired_name(table) "#{table}_retired" end def settings_from_table(original_table, table) trigger_name = self.trigger_name(original_table) function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]["pg_get_functiondef"] sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") } abort "Could not read settings" unless sql_format period = sql_format[0] field = /to_char\(NEW\.(\w+),/.match(function_def)[1] today = Time.now case period when :day name_format = "%Y%m%d" inc = 1.day today = today.beginning_of_day else name_format = "%Y%m" inc = 1.month today = today.beginning_of_month end [period, field, name_format, inc, today] end end end