lib/chrono_model/adapter.rb in chrono_model-1.0.1 vs lib/chrono_model/adapter.rb in chrono_model-1.1.0

- old
+ new

@@ -1,963 +1,180 @@ -require 'active_record' require 'active_record/connection_adapters/postgresql_adapter' -require 'multi_json' +require 'chrono_model/adapter/migrations' +require 'chrono_model/adapter/ddl' +require 'chrono_model/adapter/indexes' +require 'chrono_model/adapter/tsrange' +require 'chrono_model/adapter/upgrade' module ChronoModel # This class implements all ActiveRecord::ConnectionAdapters::SchemaStatements # methods adding support for temporal extensions. It inherits from the Postgres # adapter for a clean override of its methods using super. # class Adapter < ActiveRecord::ConnectionAdapters::PostgreSQLAdapter + include ChronoModel::Adapter::Migrations + include ChronoModel::Adapter::DDL + include ChronoModel::Adapter::Indexes + include ChronoModel::Adapter::TSRange + include ChronoModel::Adapter::Upgrade + # The schema holding current data TEMPORAL_SCHEMA = 'temporal' # The schema holding historical data HISTORY_SCHEMA = 'history' - # This is the data type used for the SCD2 validity - RANGE_TYPE = 'tsrange' - # Returns true whether the connection adapter supports our # implementation of temporal tables. Currently, Chronomodel # is supported starting with PostgreSQL 9.3. # def chrono_supported? postgresql_version >= 90300 end - # Creates the given table, possibly creating the temporal schema - # objects if the `:temporal` option is given and set to true. - # - def create_table(table_name, options = {}) - # No temporal features requested, skip - return super unless options[:temporal] + def chrono_setup! + chrono_ensure_schemas - if options[:id] == false - logger.warn "ChronoModel: Temporal Temporal tables require a primary key." - logger.warn "ChronoModel: Adding a `__chrono_id' primary key to #{table_name} definition." - - options[:id] = '__chrono_id' - end - - transaction do - _on_temporal_schema { super } - _on_history_schema { chrono_create_history_for(table_name) } - - chrono_create_view_for(table_name, options) - end + chrono_upgrade_warning end - # If renaming a temporal table, rename the history and view as well. + # Runs primary_key, indexes and default_sequence_name in the + # temporal schema, as the table there defined is the source for + # this information. # - def rename_table(name, new_name) - return super unless is_chrono?(name) - - clear_cache! - - transaction do - # Rename tables - # - [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema| - on_schema(schema) do - seq = serial_sequence(name, primary_key(name)) - new_seq = seq.sub(name.to_s, new_name.to_s).split('.').last - - execute "ALTER SEQUENCE #{seq} RENAME TO #{new_seq}" - execute "ALTER TABLE #{name} RENAME TO #{new_name}" - end - end - - - # Rename indexes on history schema - # - _on_history_schema do - standard_index_names = %w( - inherit_pkey instance_history pkey - recorded_at timeline_consistency ) - - old_names = temporal_index_names(name, :validity) + - standard_index_names.map {|i| [name, i].join('_') } - - new_names = temporal_index_names(new_name, :validity) + - standard_index_names.map {|i| [new_name, i].join('_') } - - old_names.zip(new_names).each do |old, new| - execute "ALTER INDEX #{old} RENAME TO #{new}" - end - end - - # Rename indexes on temporal schema - # - _on_temporal_schema do - temporal_indexes = indexes(new_name) - temporal_indexes.map(&:name).each do |old_idx_name| - if old_idx_name =~ /^index_#{name}_on_(?<columns>.+)/ - new_idx_name = "index_#{new_name}_on_#{$~['columns']}" - execute "ALTER INDEX #{old_idx_name} RENAME TO #{new_idx_name}" - end - end - end - - # Drop view - # - execute "DROP VIEW #{name}" - - # Drop functions - # - chrono_drop_trigger_functions_for(name) - - # Create view and functions - # - chrono_create_view_for(new_name) - end - end - - # If changing a temporal table, redirect the change to the table in the - # temporal schema and recreate views. + # Moreover, the PostgreSQLAdapter +indexes+ method uses + # current_schema(), thus this is the only (and cleanest) way to + # make injection work. # - # If the `:temporal` option is specified, enables or disables temporal - # features on the given table. Please note that you'll lose your history - # when demoting a temporal table to a plain one. + # Schema nesting is disabled on these calls, make sure to fetch + # metadata from the first caller's selected schema and not from + # the current one. # - def change_table(table_name, options = {}, &block) - transaction do - - # Add an empty proc to support calling change_table without a block. - # - block ||= proc { } - - if options[:temporal] == true - if !is_chrono?(table_name) - # Add temporal features to this table - # - if !primary_key(table_name) - execute "ALTER TABLE #{table_name} ADD __chrono_id SERIAL PRIMARY KEY" - end - - execute "ALTER TABLE #{table_name} SET SCHEMA #{TEMPORAL_SCHEMA}" - _on_history_schema { chrono_create_history_for(table_name) } - chrono_create_view_for(table_name, options) - copy_indexes_to_history_for(table_name) - - # Optionally copy the plain table data, setting up history - # retroactively. - # - if options[:copy_data] - seq = _on_history_schema { serial_sequence(table_name, primary_key(table_name)) } - from = options[:validity] || '0001-01-01 00:00:00' - - execute %[ - INSERT INTO #{HISTORY_SCHEMA}.#{table_name} - SELECT *, - nextval('#{seq}') AS hid, - tsrange('#{from}', NULL) AS validity, - timezone('UTC', now()) AS recorded_at - FROM #{TEMPORAL_SCHEMA}.#{table_name} - ] - end - end - - chrono_alter(table_name, options) { super table_name, options, &block } - else - if options[:temporal] == false && is_chrono?(table_name) - # Remove temporal features from this table - # - execute "DROP VIEW #{table_name}" - - chrono_drop_trigger_functions_for(table_name) - - _on_history_schema { execute "DROP TABLE #{table_name}" } - - default_schema = select_value 'SELECT current_schema()' - _on_temporal_schema do - if primary_key(table_name) == '__chrono_id' - execute "ALTER TABLE #{table_name} DROP __chrono_id" - end - - execute "ALTER TABLE #{table_name} SET SCHEMA #{default_schema}" - end - end - - super table_name, options, &block - end - end - end - - # If dropping a temporal table, drops it from the temporal schema - # adding the CASCADE option so to delete the history, view and triggers. + # NOTE: These methods are dynamically defined, see the source. # - def drop_table(table_name, *) - return super unless is_chrono?(table_name) - - _on_temporal_schema { execute "DROP TABLE #{table_name} CASCADE" } - - chrono_drop_trigger_functions_for(table_name) + def primary_key(table_name) end - # If adding an index to a temporal table, add it to the one in the - # temporal schema and to the history one. If the `:unique` option is - # present, it is removed from the index created in the history table. - # - def add_index(table_name, column_name, options = {}) - return super unless is_chrono?(table_name) - - transaction do - _on_temporal_schema { super } - - # Uniqueness constraints do not make sense in the history table - options = options.dup.tap {|o| o.delete(:unique)} if options[:unique].present? - - _on_history_schema { super table_name, column_name, options } + [:primary_key, :indexes, :default_sequence_name].each do |method| + define_method(method) do |*args| + table_name = args.first + return super(*args) unless is_chrono?(table_name) + on_schema(TEMPORAL_SCHEMA, recurse: :ignore) { super(*args) } end end - # If removing an index from a temporal table, remove it both from the - # temporal and the history schemas. - # - def remove_index(table_name, *) - return super unless is_chrono?(table_name) - - transaction do - _on_temporal_schema { super } - _on_history_schema { super } - end - end - - # If adding a column to a temporal table, creates it in the table in - # the temporal schema and updates the triggers. - # - def add_column(table_name, *) - return super unless is_chrono?(table_name) - - transaction do - # Add the column to the temporal table - _on_temporal_schema { super } - - # Update the triggers - chrono_create_view_for(table_name) - end - end - - # If renaming a column of a temporal table, rename it in the table in - # the temporal schema and update the triggers. - # - def rename_column(table_name, *) - return super unless is_chrono?(table_name) - - # Rename the column in the temporal table and in the view - transaction do - _on_temporal_schema { super } - super - - # Update the triggers - chrono_create_view_for(table_name) - end - end - - # If removing a column from a temporal table, we are forced to drop the - # view, then change the column from the table in the temporal schema and - # eventually recreate the triggers. - # - def change_column(table_name, *) - return super unless is_chrono?(table_name) - chrono_alter(table_name) { super } - end - - # Change the default on the temporal schema table. - # - def change_column_default(table_name, *) - return super unless is_chrono?(table_name) - _on_temporal_schema { super } - end - - # Change the null constraint on the temporal schema table. - # - def change_column_null(table_name, *) - return super unless is_chrono?(table_name) - _on_temporal_schema { super } - end - - # If removing a column from a temporal table, we are forced to drop the - # view, then drop the column from the table in the temporal schema and - # eventually recreate the triggers. - # - def remove_column(table_name, *) - return super unless is_chrono?(table_name) - chrono_alter(table_name) { super } - end - # Runs column_definitions in the temporal schema, as the table there # defined is the source for this information. # # The default search path is included however, since the table # may reference types defined in other schemas, which result in their # names becoming schema qualified, which will cause type resolutions to fail. # + # NOTE: This method is dynamically defined, see the source. + # + def column_definitions + end + define_method(:column_definitions) do |table_name| return super(table_name) unless is_chrono?(table_name) - on_schema(TEMPORAL_SCHEMA + ',' + self.schema_search_path, false) { super(table_name) } + on_schema(TEMPORAL_SCHEMA + ',' + self.schema_search_path, recurse: :ignore) { super(table_name) } end - # Runs primary_key, indexes and default_sequence_name in the temporal schema, - # as the table there defined is the source for this information. + # Evaluates the given block in the temporal schema. # - # Moreover, the PostgreSQLAdapter +indexes+ method uses current_schema(), - # thus this is the only (and cleanest) way to make injection work. - # - # Schema nesting is disabled on these calls, make sure to fetch metadata - # from the first caller's selected schema and not from the current one. - # - [:primary_key, :indexes, :default_sequence_name].each do |method| - define_method(method) do |*args| - table_name = args.first - return super(*args) unless is_chrono?(table_name) - _on_temporal_schema(false) { super(*args) } - end + def on_temporal_schema(&block) + on_schema(TEMPORAL_SCHEMA, &block) end - # Create spatial indexes for timestamp search. + # Evaluates the given block in the history schema. # - # This index is used by +TimeMachine.at+, `.current` and `.past` to - # build the temporal WHERE clauses that fetch the state of records at - # a single point in time. - # - # Parameters: - # - # `table`: the table where to create indexes on - # `range`: the tsrange field - # - # Options: - # - # `:name`: the index name prefix, defaults to - # index_{table}_temporal_on_{range / lower_range / upper_range} - # - def add_temporal_indexes(table, range, options = {}) - range_idx, lower_idx, upper_idx = - temporal_index_names(table, range, options) - - chrono_alter_index(table, options) do - execute <<-SQL - CREATE INDEX #{range_idx} ON #{table} USING gist ( #{range} ) - SQL - - # Indexes used for precise history filtering, sorting and, in history - # tables, by UPDATE / DELETE triggers. - # - execute "CREATE INDEX #{lower_idx} ON #{table} ( lower(#{range}) )" - execute "CREATE INDEX #{upper_idx} ON #{table} ( upper(#{range}) )" - end + def on_history_schema(&block) + on_schema(HISTORY_SCHEMA, &block) end - def remove_temporal_indexes(table, range, options = {}) - indexes = temporal_index_names(table, range, options) - - chrono_alter_index(table, options) do - indexes.each {|idx| execute "DROP INDEX #{idx}" } - end - end - - def temporal_index_names(table, range, options = {}) - prefix = options[:name].presence || "index_#{table}_temporal" - - # When creating computed indexes (e.g. ends_on::timestamp + time - # '23:59:59'), remove everything following the field name. - range = range.to_s.sub(/\W.*/, '') - - [range, "lower_#{range}", "upper_#{range}"].map do |suffix| - [prefix, 'on', suffix].join('_') - end - end - - - # Adds an EXCLUDE constraint to the given table, to assure that - # no more than one record can occupy a definite segment on a - # timeline. - # - def add_timeline_consistency_constraint(table, range, options = {}) - name = timeline_consistency_constraint_name(table) - id = options[:id] || primary_key(table) - - chrono_alter_constraint(table, options) do - execute <<-SQL - ALTER TABLE #{table} ADD CONSTRAINT #{name} - EXCLUDE USING gist ( #{id} WITH =, #{range} WITH && ) - SQL - end - end - - def remove_timeline_consistency_constraint(table, options = {}) - name = timeline_consistency_constraint_name(options[:prefix] || table) - - chrono_alter_constraint(table, options) do - execute <<-SQL - ALTER TABLE #{table} DROP CONSTRAINT #{name} - SQL - end - end - - def timeline_consistency_constraint_name(table) - "#{table}_timeline_consistency" - end - - # Evaluates the given block in the given +schema+ search path. # - # By default, nested call are allowed, to disable this feature - # pass +false+ as the second parameter. + # Recursion works by saving the old_path the function closure + # at each recursive call. # - def on_schema(schema, nesting = true, &block) - @_on_schema_nesting = (@_on_schema_nesting || 0) + 1 + # See specs for examples and behaviour. + # + def on_schema(schema, recurse: :follow) + old_path = self.schema_search_path - if nesting || @_on_schema_nesting == 1 - old_path = self.schema_search_path - self.schema_search_path = schema + count_recursions do + if recurse == :follow or Thread.current['recursions'] == 1 + self.schema_search_path = schema + end + + yield end - block.call - ensure - if (nesting || @_on_schema_nesting == 1) + # If the transaction is aborted, any execute() call will raise + # "transaction is aborted errors" - thus calling the Adapter's + # setter won't update the memoized variable. + # + # Here we reset it to +nil+ to refresh it on the next call, as + # there is no way to know which path will be restored when the + # transaction ends. + # + transaction_aborted = + @connection.transaction_status == PG::Connection::PQTRANS_INERROR - # If the transaction is aborted, any execute() call will raise - # "transaction is aborted errors" - thus calling the Adapter's - # setter won't update the memoized variable. - # - # Here we reset it to +nil+ to refresh it on the next call, as - # there is no way to know which path will be restored when the - # transaction ends. - # - if @connection.transaction_status == PG::Connection::PQTRANS_INERROR - @schema_search_path = nil - else - self.schema_search_path = old_path - end + if transaction_aborted && Thread.current['recursions'] == 1 + @schema_search_path = nil + else + self.schema_search_path = old_path end - @_on_schema_nesting -= 1 end # Returns true if the given name references a temporal table. # def is_chrono?(table) - _on_temporal_schema { chrono_data_source_exists?(table) } && - _on_history_schema { chrono_data_source_exists?(table) } - - rescue ActiveRecord::StatementInvalid => e - # means that we could not change the search path to check for - # table existence - if is_exception_class?(e, PG::InvalidSchemaName, PG::InvalidParameterValue) - return false - else - raise e - end + on_temporal_schema { data_source_exists?(table) } && + on_history_schema { data_source_exists?(table) } end - def is_exception_class?(e, *klasses) - if e.respond_to?(:original_exception) - klasses.any? { |k| e.is_a?(k) } - else - klasses.any? { |k| e.message =~ /#{k.name}/ } - end - end + # Reads the Gem metadata from the COMMENT set on the given PostgreSQL + # view name. + # + def chrono_metadata_for(view_name) + comment = select_value( + "SELECT obj_description(#{quote(view_name)}::regclass)", + "ChronoModel metadata for #{view_name}") if data_source_exists?(view_name) - def chrono_setup! - chrono_ensure_schemas - - chrono_upgrade_warning + MultiJson.load(comment || '{}').with_indifferent_access end - def chrono_upgrade! - chrono_ensure_schemas - - chrono_upgrade_structure! - end - - # HACK: Redefine tsrange parsing support, as it is broken currently. + # Writes Gem metadata on the COMMENT field in the given VIEW name. # - # This self-made API is here because currently AR4 does not support - # open-ended ranges. The reasons are poor support in Ruby: - # - # https://bugs.ruby-lang.org/issues/6864 - # - # and an instable interface in Active Record: - # - # https://github.com/rails/rails/issues/13793 - # https://github.com/rails/rails/issues/14010 - # - # so, for now, we are implementing our own. - # - class TSRange < ActiveRecord::ConnectionAdapters::PostgreSQL::OID::Range - OID = 3908 + def chrono_metadata_set(view_name, metadata) + comment = MultiJson.dump(metadata) - def cast_value(value) - return if value == 'empty' - return value if value.is_a?(::Array) - - extracted = extract_bounds(value) - - from = Conversions.string_to_utc_time extracted[:from] - to = Conversions.string_to_utc_time extracted[:to ] - - [from, to] - end - - def extract_bounds(value) - from, to = value[1..-2].split(',') - { - from: (value[1] == ',' || from == '-infinity') ? nil : from[1..-2], - to: (value[-2] == ',' || to == 'infinity') ? nil : to[1..-2], - } - end + execute %[ COMMENT ON VIEW #{view_name} IS #{quote(comment)} ] end - def initialize_type_map(m = type_map) - super.tap do - ar_type = type_map.fetch(TSRange::OID) - cm_type = TSRange.new(ar_type.subtype, ar_type.type) + private + # Counts the number of recursions in a thread local variable + # + def count_recursions # yield + Thread.current['recursions'] ||= 0 + Thread.current['recursions'] += 1 - type_map.register_type TSRange::OID, cm_type - end - end + yield - # Copy the indexes from the temporal table to the history table if the indexes - # are not already created with the same name. - # - # Uniqueness is voluntarily ignored, as it doesn't make sense on history - # tables. - # - # Ref: GitHub pull #21. - # - def copy_indexes_to_history_for(table_name) - history_indexes = _on_history_schema { indexes(table_name) }.map(&:name) - temporal_indexes = _on_temporal_schema { indexes(table_name) } - - temporal_indexes.each do |index| - next if history_indexes.include?(index.name) - - _on_history_schema do - execute %[ - CREATE INDEX #{index.name} ON #{table_name} - USING #{index.using} ( #{index.columns.join(', ')} ) - ], 'Copy index from temporal to history' - end + ensure + Thread.current['recursions'] -= 1 end - end - private # Create the temporal and history schemas, unless they already exist # def chrono_ensure_schemas [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema| execute "CREATE SCHEMA #{schema}" unless schema_exists?(schema) - end - end - - # Locate tables needing a structure upgrade - # - def chrono_tables_needing_upgrade - tables = { } - - _on_temporal_schema { self.tables }.each do |table_name| - next unless is_chrono?(table_name) - metadata = chrono_metadata_for(table_name) - version = metadata['chronomodel'] - - if version.blank? - tables[table_name] = { version: nil, priority: 'CRITICAL' } - elsif version != VERSION - tables[table_name] = { version: version, priority: 'LOW' } - end - end - - return tables - end - - # Emit a warning about tables needing an upgrade - # - def chrono_upgrade_warning - upgrade = chrono_tables_needing_upgrade.map do |table, desc| - "#{table} - priority: #{desc[:priority]}" - end.join('; ') - - return if upgrade.empty? - - logger.warn "ChronoModel: There are tables needing a structure upgrade, and ChronoModel structures need to be recreated." - logger.warn "ChronoModel: Please run ChronoModel.upgrade! to attempt the upgrade. If you have dependant database objects" - logger.warn "ChronoModel: the upgrade will fail and you have to drop the dependent objects, run .upgrade! and create them" - logger.warn "ChronoModel: again. Sorry. Some features or the whole library may not work correctly until upgrade is complete." - logger.warn "ChronoModel: Tables pending upgrade: #{upgrade}" - end - - # Upgrades existing structure for each table, if required. - # - def chrono_upgrade_structure! - transaction do - - chrono_tables_needing_upgrade.each do |table_name, desc| - - if desc[:version].blank? - logger.info "ChronoModel: Upgrading legacy table #{table_name} to #{VERSION}" - upgrade_from_legacy(table_name) - logger.info "ChronoModel: legacy #{table_name} upgrade complete" - else - logger.info "ChronoModel: upgrading #{table_name} from #{desc[:version]} to #{VERSION}" - chrono_create_view_for(table_name) - logger.info "ChronoModel: #{table_name} upgrade complete" - end - - end - end - rescue => e - message = "ChronoModel structure upgrade failed: #{e.message}. Please drop dependent objects first and then run ChronoModel.upgrade! again." - - # Quite important, output it also to stderr. - # - logger.error message - $stderr.puts message - end - - def upgrade_from_legacy(table_name) - # roses are red - # violets are blue - # and this is the most boring piece of code ever - history_table = "#{HISTORY_SCHEMA}.#{table_name}" - p_pkey = primary_key(table_name) - - execute "ALTER TABLE #{history_table} ADD COLUMN validity tsrange;" - execute """ - UPDATE #{history_table} SET validity = tsrange(valid_from, - CASE WHEN extract(year from valid_to) = 9999 THEN NULL - ELSE valid_to - END - ); - """ - - execute "DROP INDEX #{history_table}_temporal_on_valid_from;" - execute "DROP INDEX #{history_table}_temporal_on_valid_from_and_valid_to;" - execute "DROP INDEX #{history_table}_temporal_on_valid_to;" - execute "DROP INDEX #{history_table}_inherit_pkey" - execute "DROP INDEX #{history_table}_recorded_at" - execute "DROP INDEX #{history_table}_instance_history" - execute "ALTER TABLE #{history_table} DROP CONSTRAINT #{table_name}_valid_from_before_valid_to;" - execute "ALTER TABLE #{history_table} DROP CONSTRAINT #{table_name}_timeline_consistency;" - execute "DROP RULE #{table_name}_upd_first ON #{table_name};" - execute "DROP RULE #{table_name}_upd_next ON #{table_name};" - execute "DROP RULE #{table_name}_del ON #{table_name};" - execute "DROP RULE #{table_name}_ins ON #{table_name};" - execute "DROP TRIGGER history_ins ON #{TEMPORAL_SCHEMA}.#{table_name};" - execute "DROP FUNCTION #{TEMPORAL_SCHEMA}.#{table_name}_ins();" - execute "ALTER TABLE #{history_table} DROP COLUMN valid_from;" - execute "ALTER TABLE #{history_table} DROP COLUMN valid_to;" - - execute "CREATE EXTENSION IF NOT EXISTS btree_gist;" - - chrono_create_view_for(table_name) - _on_history_schema { add_history_validity_constraint(table_name, p_pkey) } - _on_history_schema { chrono_create_history_indexes_for(table_name, p_pkey) } - end - - def chrono_metadata_for(table) - comment = select_value( - "SELECT obj_description(#{quote(table)}::regclass)", - "ChronoModel metadata for #{table}") if chrono_data_source_exists?(table) - - MultiJson.load(comment || '{}').with_indifferent_access - end - - def chrono_metadata_set(table, metadata) - comment = MultiJson.dump(metadata) - - execute %[ - COMMENT ON VIEW #{table} IS #{quote(comment)} - ] - end - - def add_history_validity_constraint(table, pkey) - add_timeline_consistency_constraint(table, :validity, :id => pkey, :on_current_schema => true) - end - - def remove_history_validity_constraint(table, options = {}) - remove_timeline_consistency_constraint(table, options.merge(:on_current_schema => true)) - end - - # Create the history table in the history schema - def chrono_create_history_for(table) - parent = "#{TEMPORAL_SCHEMA}.#{table}" - p_pkey = primary_key(parent) - - execute <<-SQL - CREATE TABLE #{table} ( - hid SERIAL PRIMARY KEY, - validity #{RANGE_TYPE} NOT NULL, - recorded_at timestamp NOT NULL DEFAULT timezone('UTC', now()) - ) INHERITS ( #{parent} ) - SQL - - add_history_validity_constraint(table, p_pkey) - - chrono_create_history_indexes_for(table, p_pkey) - end - - def chrono_create_history_indexes_for(table, p_pkey = nil) - # Duplicate because of Migrate.upgrade_indexes_for - # TODO remove me. - p_pkey ||= primary_key("#{TEMPORAL_SCHEMA}.#{table}") - - add_temporal_indexes table, :validity, :on_current_schema => true - - execute "CREATE INDEX #{table}_inherit_pkey ON #{table} ( #{p_pkey} )" - execute "CREATE INDEX #{table}_recorded_at ON #{table} ( recorded_at )" - execute "CREATE INDEX #{table}_instance_history ON #{table} ( #{p_pkey}, recorded_at )" - end - - # Create the public view and its INSTEAD OF triggers - # - def chrono_create_view_for(table, options = nil) - pk = primary_key(table) - current = [TEMPORAL_SCHEMA, table].join('.') - history = [HISTORY_SCHEMA, table].join('.') - seq = serial_sequence(current, pk) - - options ||= chrono_metadata_for(table) - - # SELECT - return only current data - # - execute "DROP VIEW #{table}" if chrono_data_source_exists? table - execute "CREATE VIEW #{table} AS SELECT * FROM ONLY #{current}" - - # Set default values on the view (closes #12) - # - chrono_metadata_set(table, options.merge(:chronomodel => VERSION)) - - columns(table).each do |column| - default = if column.default.nil? - column.default_function - else - if ActiveRecord::VERSION::MAJOR == 4 - quote(column.default, column) - else # Rails 5 and beyond - quote(column.default) - end - end - - next if column.name == pk || default.nil? - - execute "ALTER VIEW #{table} ALTER COLUMN #{quote_column_name(column.name)} SET DEFAULT #{default}" - end - - columns = columns(table).map {|c| quote_column_name(c.name)} - columns.delete(quote_column_name(pk)) - - fields, values = columns.join(', '), columns.map {|c| "NEW.#{c}"}.join(', ') - - # Columns to be journaled. By default everything except updated_at (GH #7) - # - journal = if options[:journal] - options[:journal].map {|col| quote_column_name(col)} - - elsif options[:no_journal] - columns - options[:no_journal].map {|col| quote_column_name(col)} - - elsif options[:full_journal] - columns - - else - columns - [ quote_column_name('updated_at') ] - end - - journal &= columns - - # INSERT - insert data both in the temporal table and in the history one. - # - # The serial sequence is invoked manually only if the PK is NULL, to - # allow setting the PK to a specific value (think migration scenario). - # - execute <<-SQL - CREATE OR REPLACE FUNCTION chronomodel_#{table}_insert() RETURNS TRIGGER AS $$ - BEGIN - IF NEW.#{pk} IS NULL THEN - NEW.#{pk} := nextval('#{seq}'); - END IF; - - INSERT INTO #{current} ( #{pk}, #{fields} ) - VALUES ( NEW.#{pk}, #{values} ); - - INSERT INTO #{history} ( #{pk}, #{fields}, validity ) - VALUES ( NEW.#{pk}, #{values}, tsrange(timezone('UTC', now()), NULL) ); - - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - - DROP TRIGGER IF EXISTS chronomodel_insert ON #{table}; - - CREATE TRIGGER chronomodel_insert INSTEAD OF INSERT ON #{table} - FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_insert(); - SQL - - # UPDATE - set the last history entry validity to now, save the current data - # in a new history entry and update the temporal table with the new data. - # - # If there are no changes, this trigger suppresses redundant updates. - # - # If a row in the history with the current ID and current timestamp already - # exists, update it with new data. This logic makes possible to "squash" - # together changes made in a transaction in a single history row. - # - # If you want to disable this behaviour, set the CHRONOMODEL_NO_SQUASH - # environment variable. This is useful when running scenarios inside - # cucumber, in which everything runs in the same transaction. - # - execute <<-SQL - CREATE OR REPLACE FUNCTION chronomodel_#{table}_update() RETURNS TRIGGER AS $$ - DECLARE _now timestamp; - DECLARE _hid integer; - DECLARE _old record; - DECLARE _new record; - BEGIN - IF OLD IS NOT DISTINCT FROM NEW THEN - RETURN NULL; - END IF; - - _old := row(#{journal.map {|c| "OLD.#{c}" }.join(', ')}); - _new := row(#{journal.map {|c| "NEW.#{c}" }.join(', ')}); - - IF _old IS NOT DISTINCT FROM _new THEN - UPDATE ONLY #{current} SET ( #{fields} ) = ( #{values} ) WHERE #{pk} = OLD.#{pk}; - RETURN NEW; - END IF; - - _now := timezone('UTC', now()); - _hid := NULL; - - #{"SELECT hid INTO _hid FROM #{history} WHERE #{pk} = OLD.#{pk} AND lower(validity) = _now;" unless ENV['CHRONOMODEL_NO_SQUASH']} - - IF _hid IS NOT NULL THEN - UPDATE #{history} SET ( #{fields} ) = ( #{values} ) WHERE hid = _hid; - ELSE - UPDATE #{history} SET validity = tsrange(lower(validity), _now) - WHERE #{pk} = OLD.#{pk} AND upper_inf(validity); - - INSERT INTO #{history} ( #{pk}, #{fields}, validity ) - VALUES ( OLD.#{pk}, #{values}, tsrange(_now, NULL) ); - END IF; - - UPDATE ONLY #{current} SET ( #{fields} ) = ( #{values} ) WHERE #{pk} = OLD.#{pk}; - - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - - DROP TRIGGER IF EXISTS chronomodel_update ON #{table}; - - CREATE TRIGGER chronomodel_update INSTEAD OF UPDATE ON #{table} - FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_update(); - SQL - - # DELETE - save the current data in the history and eventually delete the - # data from the temporal table. - # The first DELETE is required to remove history for records INSERTed and - # DELETEd in the same transaction. - # - execute <<-SQL - CREATE OR REPLACE FUNCTION chronomodel_#{table}_delete() RETURNS TRIGGER AS $$ - DECLARE _now timestamp; - BEGIN - _now := timezone('UTC', now()); - - DELETE FROM #{history} - WHERE #{pk} = old.#{pk} AND validity = tsrange(_now, NULL); - - UPDATE #{history} SET validity = tsrange(lower(validity), _now) - WHERE #{pk} = old.#{pk} AND upper_inf(validity); - - DELETE FROM ONLY #{current} - WHERE #{pk} = old.#{pk}; - - RETURN OLD; - END; - $$ LANGUAGE plpgsql; - - DROP TRIGGER IF EXISTS chronomodel_delete ON #{table}; - - CREATE TRIGGER chronomodel_delete INSTEAD OF DELETE ON #{table} - FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_delete(); - SQL - end - - def chrono_drop_trigger_functions_for(table_name) - %w( insert update delete ).each do |func| - execute "DROP FUNCTION IF EXISTS chronomodel_#{table_name}_#{func}()" - end - end - - # In destructive changes, such as removing columns or changing column - # types, the view must be dropped and recreated, while the change has - # to be applied to the table in the temporal schema. - # - def chrono_alter(table_name, opts = {}) - transaction do - options = chrono_metadata_for(table_name).merge(opts) - - execute "DROP VIEW #{table_name}" - - _on_temporal_schema { yield } - - # Recreate the triggers - chrono_create_view_for(table_name, options) - end - end - - # Generic alteration of history tables, where changes have to be - # propagated both on the temporal table and the history one. - # - # Internally, the :on_current_schema bypasses the +is_chrono?+ - # check, as some temporal indexes and constraints are created - # only on the history table, and the creation methods already - # run scoped into the correct schema. - # - def chrono_alter_index(table_name, options) - if is_chrono?(table_name) && !options[:on_current_schema] - _on_temporal_schema { yield } - _on_history_schema { yield } - else - yield - end - end - - def chrono_alter_constraint(table_name, options) - if is_chrono?(table_name) && !options[:on_current_schema] - _on_temporal_schema { yield } - else - yield - end - end - - def chrono_data_source_exists?(table_name) - if ActiveRecord::VERSION::MAJOR >= 5 - data_source_exists?(table_name) - else - # On Rails 4, table_exists? has the same behaviour, checking if both - # a view or table exists - table_exists?(table_name) - end - end - - def _on_temporal_schema(nesting = true, &block) - on_schema(TEMPORAL_SCHEMA, nesting, &block) - end - - def _on_history_schema(nesting = true, &block) - on_schema(HISTORY_SCHEMA, nesting, &block) - end - - def translate_exception(exception, message) - if exception.message =~ /conflicting key value violates exclusion constraint/ - ActiveRecord::RecordNotUnique.new(message) - else - super end end end end