require 'active_record/migration' require 'active_record/connection_adapters/mysql_pt_osc_adapter' require 'shellwords' module ActiveRecord class Migrator alias_method :record_version_state_after_migrating_without_reconnect, :record_version_state_after_migrating def record_version_state_after_migrating(version) ActiveRecord::Base.logger.debug 'Verifying active connections prior to recording version' if ActiveRecord::Base.logger ActiveRecord::Base.verify_active_connections! # Reconnect to DB if it's gone away while we were migrating. record_version_state_after_migrating_without_reconnect(version) end end class UnsupportedMigrationError < ActiveRecordError; end class PtOscMigration < Migration # @TODO whitelist all valid pt-osc flags PERCONA_FLAGS = { 'defaults-file' => { mutator: :make_path_absolute, }, 'recursion-method' => { version: '>= 2.1', }, 'execute' => { default: false, }, 'check-alter' => { boolean: true, default: true, mutator: :execute_only, version: '>= 2.1', }, 'user' => { mutator: :get_from_config, arguments: { key_name: 'username', }, default: nil, }, 'password' => { mutator: :get_from_config, default: nil, }, }.freeze def self.percona_flags PERCONA_FLAGS end def migrate(direction) return unless respond_to?(direction) run_mode = percona_config[:run_mode] || 'print' raise ArgumentError.new('Invalid run_mode specified in database config') unless run_mode.in? %w(print execute) case direction when :up then announce 'migrating' when :down then announce 'reverting' end time = nil ActiveRecord::Base.connection_pool.with_connection do |conn| @connection = conn if respond_to?(:change) if direction == :down recorder = CommandRecorder.new(@connection) suppress_messages do @connection = recorder change end @connection = conn time = Benchmark.measure { self.revert { recorder.inverse.each do |cmd, args| send(cmd, *args) end } } else time = Benchmark.measure { change } end else time = Benchmark.measure { send(direction) } end case run_mode when 'execute' then time += Benchmark.measure { execute_pt_osc } when 'print' then print_pt_osc end @connection = nil end case direction when :up then announce 'migrated (%.4fs)' % time.real; write when :down then announce 'reverted (%.4fs)' % time.real; write end end def method_missing(method, *arguments, &block) # :nodoc: # Putting this here ensures that pt_osc_migration shows up in the caller trace super end protected def execute_pt_osc return unless @connection.is_a? ActiveRecord::ConnectionAdapters::MysqlPtOscAdapter return if @connection.get_commanded_tables.empty? database_name = database_config[:database] announce 'running pt-online-schema-change' @connection.get_commanded_tables.each { |table| migrate_table(database_name, table) } @connection.clear_commands end def migrate_table(database_name, table_name) execute_sql = @connection.get_commands_string(table_name) logger.info "Running on #{database_name}|#{table_name}: #{execute_sql}" [true, false].each { |dry_run| execute_sql_for_table(execute_sql, database_name, table_name, dry_run) } end def execute_sql_for_table(execute_sql, database_name, table_name, dry_run = true) command = percona_command(execute_sql, database_name, table_name, execute: !dry_run) logger.info "Command is #{self.class.sanitize_command(command)}" success = Kernel.system command if success logger.info "Successfully #{dry_run ? 'dry ran' : 'executed'} on #{database_name}|#{table_name}: #{execute_sql}" else failure_message = "Unable to #{dry_run ? 'dry run' : 'execute'} query on #{database_name}|#{table_name}: #{execute_sql}" logger.error failure_message raise RuntimeError.new(failure_message) end end def print_pt_osc return unless @connection.is_a? ActiveRecord::ConnectionAdapters::MysqlPtOscAdapter database_name = database_config[:database] @connection.get_commanded_tables.each do |table_name| execute_sql = @connection.get_commands_string(table_name) announce 'Run the following commands:' [true, false].each do |dry_run| write self.class.sanitize_command(percona_command(execute_sql, database_name, table_name, execute: !dry_run)) end end @connection.clear_commands end def percona_command(execute_sql, database_name, table_name, options = {}) command = ['pt-online-schema-change', '--alter', execute_sql || '', "D=#{database_name},t=#{table_name}"] # Whitelist options = HashWithIndifferentAccess.new(options) options = options.slice(*self.class.percona_flags.keys) # Merge config config = percona_config if config config.slice(*self.class.percona_flags.keys).each do |key, value| options[key] ||= value end end # Set defaults self.class.percona_flags.each do |flag, flag_config| options[flag] = flag_config[:default] if flag_config.key?(:default) && !options.key?(flag) end command_parts = command + [run_mode_flag(options)] + command_flags(options) command_parts.shelljoin end def self.tool_version @_tool_version ||= Gem::Version.new(get_tool_version.sub('pt-online-schema-change', '').strip) end def database_config @db_config ||= raw_database_config.with_indifferent_access end def percona_config database_config[:percona] || {} end def logfile File.open(make_path_absolute(percona_config[:log] || 'log/pt_osc.log'), 'a') end def logger return @logger if @logger @logger = Logger.new(logfile) @logger.formatter = Logger::Formatter.new # Don't let ActiveSupport override with SimpleFormatter @logger.progname = 'pt-osc' @logger end private def raw_database_config connection.pool.spec.config || ActiveRecord::Base.connection_config end def command_flags(options) options.flat_map do |key, value| next if key == 'execute' flag_options = self.class.percona_flags[key] # Satisfy version requirements if flag_options.try(:key?, :version) next unless Gem::Requirement.new(flag_options[:version]).satisfied_by? self.class.tool_version end # Mutate the value if needed if flag_options.try(:key?, :mutator) value = send(flag_options[:mutator], value, { all_options: options, flag_name: key }.merge(flag_options[:arguments] || {})) next if value.nil? # Allow a mutator to determine the flag shouldn't be used end # Handle boolean flags if flag_options.try(:[], :boolean) key = "no-#{key}" unless value value = nil end ["--#{key}", value] end.compact end def run_mode_flag(options) options[:execute] ? '--execute' : '--dry-run' end def self.get_tool_version `pt-online-schema-change --version` end def self.sanitize_command(command) command_parts = command.shellsplit password_index = command_parts.find_index('--password') command_parts[password_index + 1] = '_hidden_' unless password_index.nil? || command_parts.length == password_index + 1 command_parts.shelljoin end # Flag mutators def make_path_absolute(path, _ = {}) return path if path[0] == '/' # If path is not already absolute, treat it as relative to the app root File.expand_path(path, Dir.getwd) end def execute_only(flag, options = {}) options[:all_options][:execute] ? flag : self.class.percona_flags[options[:flag_name]][:default] end def get_from_config(flag, options = {}) case flag when nil database_config[options[:key_name] || options[:flag_name]] when false nil else flag end end def execute(sql) raise ActiveRecord::UnsupportedMigrationError.new("Raw `execute` isn't supported by the pt-osc gem.") end end end