# frozen_string_literal: true module ActiveRecord module ConnectionAdapters module Elasticsearch # extend adapter with table-related statements module TableStatements extend ActiveSupport::Concern included do # ORIGINAL methods untouched: # # SUPPORTED but not used: # - # # UNSUPPORTED methods that will be ignored: # - native_database_types # - table_options # - table_comment # - table_alias_for # # UNSUPPORTED methods that will fail: # - create_join_table # - drop_join_table # - create_alter_table # - change_column_default # - change_column_null # - rename_column define_unsupported_method :create_join_table, :drop_join_table, :create_alter_table, :change_column_default, :change_column_null, :rename_column, :rename_table # Opens a closed index. # @param [String] table_name # @return [Boolean] acknowledged status def open_table(table_name) schema_cache.clear_data_source_cache!(table_name) api(:indices, :open, { index: table_name }, 'OPEN TABLE').dig('acknowledged') end # Opens closed indices. # @param [Array] table_names # @return [Array] acknowledged status for each provided table def open_tables(*table_names) table_names -= [schema_migration.table_name, InternalMetadata.table_name] return if table_names.empty? table_names.map { |table_name| open_table(table_name) } end # Closes an index. # @param [String] table_name # @return [Boolean] acknowledged status def close_table(table_name) schema_cache.clear_data_source_cache!(table_name) api(:indices, :close, { index: table_name }, 'CLOSE TABLE').dig('acknowledged') end # Closes indices by provided names. # @param [Array] table_names # @return [Array] acknowledged status for each provided table def close_tables(*table_names) table_names -= [schema_migration.table_name, InternalMetadata.table_name] return if table_names.empty? table_names.map { |table_name| close_table(table_name) } end # refresh an index. # A refresh makes recent operations performed on one or more indices available for search. # raises an exception if the index could not be found. # # @param [String] table_name # @return [Boolean] result state (returns false if refreshing failed) def refresh_table(table_name) api(:indices, :refresh, { index: table_name }, 'REFRESH TABLE').dig('_shards', 'failed') == 0 end # refresh indices by provided names. # @param [Array] table_names # @return [Array] result state (returns false if refreshing failed) def refresh_tables(*table_names) table_names -= [schema_migration.table_name, InternalMetadata.table_name] return if table_names.empty? table_names.map { |table_name| refresh_table(table_name) } end # truncates index by provided name. # HINT: Elasticsearch does not have a +truncate+ concept: # - so we have to store the current index' schema # - drop the index # - and create it again # @param [String] table_name # @return [Boolean] acknowledged status def truncate_table(table_name) # force: automatically drops an existing index create_table(table_name, force: true, **table_schema(table_name)) end alias :truncate :truncate_table # truncate indices by provided names. # @param [Array] table_names # @return [Array] acknowledged status for each provided table def truncate_tables(*table_names) table_names -= [schema_migration.table_name, InternalMetadata.table_name] return if table_names.empty? table_names.map { |table_name| truncate_table(table_name) } end # drops an index # [:if_exists] # Set to +true+ to only drop the table if it exists. # Defaults to false. # @param [String] table_name # @param [Boolean] if_exists # @return [Boolean] acknowledged status def drop_table(table_name, if_exists: false, **) schema_cache.clear_data_source_cache!(table_name) api(:indices, :delete, { index: table_name, ignore: (if_exists ? 404 : nil) }, 'DROP TABLE').dig('acknowledged') end # blocks access to the provided table (index) and +block+ name. # @param [String] table_name # @param [Symbol] block_name The block to add (one of :read, :write, :read_only or :metadata) # @return [Boolean] acknowledged status def block_table(table_name, block_name = :write) api(:indices, :add_block, { index: table_name, block: block_name }, "BLOCK #{block_name.to_s.upcase} TABLE").dig('acknowledged') end # unblocks access to the provided table (index) and +block+ name. # provide a nil-value to unblock all blocks, otherwise provide the blocked name. # @param [String] table_name # @param [Symbol] block_name The block to add (one of :read, :write, :read_only or :metadata) # @return [Boolean] acknowledged status def unblock_table(table_name, block_name = nil) if block_name.nil? change_table(table_name) do |t| t.change_setting('index.blocks.read', nil) t.change_setting('index.blocks.write', nil) t.change_setting('index.blocks.read_only', nil) t.change_setting('index.blocks.metadata', nil) end else change_setting(table_name, "index.blocks.#{block_name}", nil) end end # clones an entire table (index) with its docs to the provided +target_name+. # During cloning, the table will be automatically 'write'-blocked. # @param [String] table_name # @param [String] target_name # @param [Hash] options # @return [Boolean] acknowledged status def clone_table(table_name, target_name, **options) # create new definition definition = clone_table_definition(table_name, target_name, **extract_table_options!(options)) # yield optional block if block_given? definition.assign do |d| yield d end end # execute definition query(ies) definition.exec! end # creates a backup (snapshot) of the entire table (index) from provided +table_name+. # The backup will be closed, to prevent read/write access. # The +target_name+ will be auto-generated, if not provided. # # @example # backup_table('screenshots', to: 'screenshots-backup-v1') # # @param [String] table_name # @param [String] to - target_name # @param [Boolean] close - closes backup after creation (default: true) # @return [String] backup_name def backup_table(table_name, to: nil, close: true) to ||= "#{table_name}-snapshot-#{Time.now.strftime('%s%3N')}" raise ArgumentError, "unable to backup '#{table_name}' to already existing target '#{to}'!" if table_exists?(to) clone_table(table_name, to) close_table(to) if close to end # restores a entire table (index) from provided +target_name+. # The +table_name+ will be dropped, if exists. # The +from+ will persist, if not provided +drop_backup:true+. # # @example # restore_table('screenshots', from: 'screenshots-backup-v1') # # @param [String] table_name # @param [String] from # @param [String (frozen)] timeout - renaming timout (default: '30s') # @param [Boolean] open - opens restored backup after creation (default: true) # @return [Boolean] acknowledged status def restore_table(table_name, from:, timeout: nil, open: true, drop_backup: false) raise ArgumentError, "unable to restore from missing target '#{from}'!" unless table_exists?(from) drop_table(table_name, if_exists: true) # choose best strategy if drop_backup rename_table(from, table_name, timeout: timeout) else clone_table(from, table_name) end # open, if provided open_table(from) if open end # renames a table (index) by executing multiple steps: # - clone table # - wait for 'green' state # - drop old table # The +timeout+ option will define how long to wait for the 'green' state. # # @param [String] table_name # @param [String] target_name # @param [String (frozen)] timeout (default: '30s') # @param [Hash] options - additional 'clone' options (like settings, alias, ...) def rename_table(table_name, target_name, timeout: nil, **options) schema_cache.clear_data_source_cache!(table_name) clone_table(table_name, target_name, **options) cluster_health(index: target_name, wait_for_status: 'green', timeout: timeout.presence || '30s') drop_table(table_name) end # creates a new table (index). # [:force] # Set to +true+ to drop an existing table # Defaults to false. # [:copy_from] # Set to an existing index, to copy it's schema. # [:if_not_exists] # Set to +true+ to skip creation if table already exists. # Defaults to false. # @param [String] table_name # @param [Boolean] force - force a drop on the existing table (default: false) # @param [nil, String] copy_from - copy schema from existing table # @param [Hash] options # @return [Boolean] acknowledged status def create_table(table_name, force: false, copy_from: nil, if_not_exists: false, **options) return if if_not_exists && table_exists?(table_name) # copy schema from existing table options.merge!(table_schema(copy_from)) if copy_from # create new definition definition = create_table_definition(table_name, **extract_table_options!(options)) # yield optional block if block_given? definition.assign do |d| yield d end end # force drop existing table if force drop_table(table_name, if_exists: true) else schema_cache.clear_data_source_cache!(table_name.to_s) end # execute definition query(ies) definition.exec! end # A block for changing mappings, settings & aliases in +table+. # # # change_table() yields a ChangeTableDefinition instance # change_table(:suppliers) do |t| # t.mapping :name, :string # # Other column alterations here # end def change_table(table_name, if_exists: false, recreate: false, **options, &block) return if if_exists && !table_exists?(table_name) # check 'recreate' flag. # If true, a 'create_table' with copy of the current will be executed return create_table(table_name, force: true, copy_from: table_name, **options, &block) if recreate # build new update definition definition = update_table_definition(table_name, self, **options) # yield optional block if block_given? definition.assign do |d| yield d end end # execute definition query(ies) definition.exec! end # Copies documents from a source to a destination. # @param [String] table_name # @param [String] target_name # @param [Hash] options # @return [Hash] reindex stats def reindex_table(table_name, target_name, **options) api(:core, :reindex, { body: { source: { index: table_name }, dest: { index: target_name } } }.merge(options), 'REINDEX TABLE') end # -- mapping ------------------------------------------------------------------------------------------------- def add_mapping(table_name, name, type, **options, &block) _exec_change_table_with(:add_mapping, table_name, name, type, **options, &block) end alias :add_column :add_mapping # will fail unless +recreate:true+ option was provided def change_mapping(table_name, name, type, **options, &block) _exec_change_table_with(:change_mapping, table_name, name, type, **options, &block) end alias :change_column :change_mapping def remove_mapping(table_name, name, **options) _exec_change_table_with(:remove_mapping, table_name, name, **options) end alias :remove_column :remove_mapping def change_mapping_meta(table_name, name, **options) _exec_change_table_with(:change_mapping_meta, table_name, name, **options) end def change_mapping_attributes(table_name, name, **options, &block) _exec_change_table_with(:change_mapping_attributes, table_name, name, **options, &block) end def change_meta(table_name, name, value, **options) _exec_change_table_with(:change_meta, table_name, name, value, **options) end def remove_meta(table_name, name, **options) _exec_change_table_with(:remove_meta, table_name, name, **options) end # -- setting ------------------------------------------------------------------------------------------------- def add_setting(table_name, name, value, **options, &block) _exec_change_table_with(:add_setting, table_name, name, value, **options, &block) end def change_setting(table_name, name, value, **options, &block) _exec_change_table_with(:change_setting, table_name, name, value, **options, &block) end def remove_setting(table_name, name, **options, &block) _exec_change_table_with(:remove_setting, table_name, name, **options, &block) end # -- alias --------------------------------------------------------------------------------------------------- def add_alias(table_name, name, **options, &block) _exec_change_table_with(:add_alias, table_name, name, **options, &block) end def change_alias(table_name, name, **options, &block) _exec_change_table_with(:change_alias, table_name, name, **options, &block) end def remove_alias(table_name, name, **options, &block) _exec_change_table_with(:remove_alias, table_name, name, **options, &block) end # recaps a provided +table_name+ with optionally configured +table_name_prefix+ & +table_name_suffix+. # This depends on the connection config of the current environment. # # @param [String] table_name # @return [String] def _env_table_name(table_name) table_name = table_name.to_s # HINT: +"" creates a new +unfrozen+ string! name = +"" name << table_name_prefix unless table_name.start_with?(table_name_prefix) name << table_name name << table_name_suffix unless table_name.end_with?(table_name_suffix) name end private def _exec_change_table_with(method, table_name, *args, recreate: false, **kwargs, &block) change_table(table_name, recreate: recreate) do |t| t.send(method, *args, **kwargs, &block) end end end end end end end