# frozen_string_literal: true module ActiveRecord module ConnectionAdapters module SQLite3 module SchemaStatements # :nodoc: # Returns an array of indexes for the given table. def indexes(table_name) internal_exec_query("PRAGMA index_list(#{quote_table_name(table_name)})", "SCHEMA").filter_map do |row| # Indexes SQLite creates implicitly for internal use start with "sqlite_". # See https://www.sqlite.org/fileformat2.html#intschema next if row["name"].start_with?("sqlite_") index_sql = query_value(<<~SQL, "SCHEMA") SELECT sql FROM sqlite_master WHERE name = #{quote(row['name'])} AND type = 'index' UNION ALL SELECT sql FROM sqlite_temp_master WHERE name = #{quote(row['name'])} AND type = 'index' SQL /\bON\b\s*"?(\w+?)"?\s*\((?.+?)\)(?:\s*WHERE\b\s*(?.+))?(?:\s*\/\*.*\*\/)?\z/i =~ index_sql columns = internal_exec_query("PRAGMA index_info(#{quote(row['name'])})", "SCHEMA").map do |col| col["name"] end orders = {} if columns.any?(&:nil?) # index created with an expression columns = expressions else # Add info on sort order for columns (only desc order is explicitly specified, # asc is the default) if index_sql # index_sql can be null in case of primary key indexes index_sql.scan(/"(\w+)" DESC/).flatten.each { |order_column| orders[order_column] = :desc } end end IndexDefinition.new( table_name, row["name"], row["unique"] != 0, columns, where: where, orders: orders ) end end def add_foreign_key(from_table, to_table, **options) assert_valid_deferrable(options[:deferrable]) alter_table(from_table) do |definition| to_table = strip_table_name_prefix_and_suffix(to_table) definition.foreign_key(to_table, **options) end end def remove_foreign_key(from_table, to_table = nil, **options) return if options.delete(:if_exists) == true && !foreign_key_exists?(from_table, to_table) to_table ||= options[:to_table] options = options.except(:name, :to_table, :validate) foreign_keys = foreign_keys(from_table) fkey = foreign_keys.detect do |fk| table = to_table || begin table = options[:column].to_s.delete_suffix("_id") Base.pluralize_table_names ? table.pluralize : table end table = strip_table_name_prefix_and_suffix(table) fk_to_table = strip_table_name_prefix_and_suffix(fk.to_table) fk_to_table == table && options.all? { |k, v| fk.options[k].to_s == v.to_s } end || raise(ArgumentError, "Table '#{from_table}' has no foreign key for #{to_table || options}") foreign_keys.delete(fkey) alter_table(from_table, foreign_keys) end def virtual_table_exists?(table_name) query_values(data_source_sql(table_name, type: "VIRTUAL TABLE"), "SCHEMA").any? end def check_constraints(table_name) table_sql = query_value(<<-SQL, "SCHEMA") SELECT sql FROM sqlite_master WHERE name = #{quote(table_name)} AND type = 'table' UNION ALL SELECT sql FROM sqlite_temp_master WHERE name = #{quote(table_name)} AND type = 'table' SQL table_sql.to_s.scan(/CONSTRAINT\s+(?\w+)\s+CHECK\s+\((?(:?[^()]|\(\g\))+)\)/i).map do |name, expression| CheckConstraintDefinition.new(table_name, expression, name: name) end end def add_check_constraint(table_name, expression, **options) alter_table(table_name) do |definition| definition.check_constraint(expression, **options) end end def remove_check_constraint(table_name, expression = nil, if_exists: false, **options) return if if_exists && !check_constraint_exists?(table_name, **options) check_constraints = check_constraints(table_name) chk_name_to_delete = check_constraint_for!(table_name, expression: expression, **options).name check_constraints.delete_if { |chk| chk.name == chk_name_to_delete } alter_table(table_name, foreign_keys(table_name), check_constraints) end def create_schema_dumper(options) SQLite3::SchemaDumper.create(self, options) end def schema_creation # :nodoc SQLite3::SchemaCreation.new(self) end private def valid_table_definition_options super + [:rename] end def create_table_definition(name, **options) SQLite3::TableDefinition.new(self, name, **options) end def validate_index_length!(table_name, new_name, internal = false) super unless internal end def new_column_from_field(table_name, field, definitions) default = field["dflt_value"] type_metadata = fetch_type_metadata(field["type"]) default_value = extract_value_from_default(default) generated_type = extract_generated_type(field) if generated_type.present? default_function = default else default_function = extract_default_function(default_value, default) end rowid = is_column_the_rowid?(field, definitions) Column.new( field["name"], default_value, type_metadata, field["notnull"].to_i == 0, default_function, collation: field["collation"], auto_increment: field["auto_increment"], rowid: rowid, generated_type: generated_type ) end INTEGER_REGEX = /integer/i # if a rowid table has a primary key that consists of a single column # and the declared type of that column is "INTEGER" in any mixture of upper and lower case, # then the column becomes an alias for the rowid. def is_column_the_rowid?(field, column_definitions) return false unless INTEGER_REGEX.match?(field["type"]) && field["pk"] == 1 # is the primary key a single column? column_definitions.one? { |c| c["pk"] > 0 } end def data_source_sql(name = nil, type: nil) scope = quoted_scope(name, type: type) scope[:type] ||= "'table','view'" sql = +"SELECT name FROM pragma_table_list WHERE schema <> 'temp'" sql << " AND name NOT IN ('sqlite_sequence', 'sqlite_schema')" sql << " AND name = #{scope[:name]}" if scope[:name] sql << " AND type IN (#{scope[:type]})" sql end def quoted_scope(name = nil, type: nil) type = \ case type when "BASE TABLE" "'table'" when "VIEW" "'view'" when "VIRTUAL TABLE" "'virtual'" end scope = {} scope[:name] = quote(name) if name scope[:type] = type if type scope end def assert_valid_deferrable(deferrable) return if !deferrable || %i(immediate deferred).include?(deferrable) raise ArgumentError, "deferrable must be `:immediate` or `:deferred`, got: `#{deferrable.inspect}`" end def extract_generated_type(field) case field["hidden"] when 2 then :virtual when 3 then :stored end end end end end end