lib/chrono_model/adapter.rb in chrono_model-0.5.3 vs lib/chrono_model/adapter.rb in chrono_model-0.8.0

- old
+ new

@@ -12,14 +12,19 @@ TEMPORAL_SCHEMA = 'temporal' # The schema holding historical data HISTORY_SCHEMA = 'history' - # Chronomodel is supported starting with PostgreSQL >= 9.0 + # This is the data type used for the SCD2 validity + RANGE_TYPE = 'tsrange' + + # Returns true whether the connection adapter supports our + # implementation of temporal tables. Currently, Chronomodel + # is supported starting with PostgreSQL 9.3. # def chrono_supported? - postgresql_version >= 90000 + postgresql_version >= 90300 end # Creates the given table, possibly creating the temporal schema # objects if the `:temporal` option is given and set to true. # @@ -32,18 +37,15 @@ logger.warn "WARNING - Creating a \"__chrono_id\" primary key to fulfill the requirement" options[:id] = '__chrono_id' end - # Create required schemas - chrono_create_schemas! - transaction do _on_temporal_schema { super } _on_history_schema { chrono_create_history_for(table_name) } - chrono_create_view_for(table_name) + chrono_create_view_for(table_name, options) TableCache.add! table_name end end @@ -53,20 +55,49 @@ return super unless is_chrono?(name) clear_cache! transaction do + # Rename tables + # [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema| on_schema(schema) do seq = serial_sequence(name, primary_key(name)) new_seq = seq.sub(name.to_s, new_name.to_s).split('.').last execute "ALTER SEQUENCE #{seq} RENAME TO #{new_seq}" execute "ALTER TABLE #{name} RENAME TO #{new_name}" end end + # Rename indexes + # + pkey = primary_key(new_name) + _on_history_schema do + standard_index_names = %w( + inherit_pkey instance_history pkey + recorded_at timeline_consistency ) + + old_names = temporal_index_names(name, :validity) + + standard_index_names.map {|i| [name, i].join('_') } + + new_names = temporal_index_names(new_name, :validity) + + standard_index_names.map {|i| [new_name, i].join('_') } + + old_names.zip(new_names).each do |old, new| + execute "ALTER INDEX #{old} RENAME TO #{new}" + end + end + + # Rename functions + # + %w( insert update delete ).each do |func| + execute "ALTER FUNCTION chronomodel_#{name}_#{func}() RENAME TO chronomodel_#{new_name}_#{func}" + end + + # Rename the public view + # execute "ALTER VIEW #{name} RENAME TO #{new_name}" TableCache.del! name TableCache.add! new_name end @@ -94,11 +125,12 @@ execute "ALTER TABLE #{table_name} ADD __chrono_id SERIAL PRIMARY KEY" end execute "ALTER TABLE #{table_name} SET SCHEMA #{TEMPORAL_SCHEMA}" _on_history_schema { chrono_create_history_for(table_name) } - chrono_create_view_for(table_name) + chrono_create_view_for(table_name, options) + copy_indexes_to_history_for(table_name) TableCache.add! table_name # Optionally copy the plain table data, setting up history # retroactively. @@ -108,14 +140,13 @@ from = options[:validity] || '0001-01-01 00:00:00' execute %[ INSERT INTO #{HISTORY_SCHEMA}.#{table_name} SELECT *, - nextval('#{seq}') AS hid, - timestamp '#{from}' AS valid_from, - timestamp '9999-12-31 00:00:00' AS valid_to, - timezone('UTC', now()) AS recorded_at + nextval('#{seq}') AS hid, + tsrange('#{from}', NULL) AS validity, + timezone('UTC', now()) AS recorded_at FROM #{TEMPORAL_SCHEMA}.#{table_name} ] end end @@ -123,11 +154,17 @@ else if options[:temporal] == false && is_chrono?(table_name) # Remove temporal features from this table # execute "DROP VIEW #{table_name}" - _on_temporal_schema { execute "DROP FUNCTION IF EXISTS #{table_name}_ins() CASCADE" } + + _on_temporal_schema do + %w( insert update delete ).each do |func| + execute "DROP FUNCTION IF EXISTS #{table_name}_#{func}() CASCADE" + end + end + _on_history_schema { execute "DROP TABLE #{table_name}" } default_schema = select_value 'SELECT current_schema()' _on_temporal_schema do if primary_key(table_name) == '__chrono_id' @@ -144,11 +181,11 @@ end end end # If dropping a temporal table, drops it from the temporal schema - # adding the CASCADE option so to delete the history, view and rules. + # adding the CASCADE option so to delete the history, view and triggers. # def drop_table(table_name, *) return super unless is_chrono?(table_name) _on_temporal_schema { execute "DROP TABLE #{table_name} CASCADE" } @@ -184,43 +221,43 @@ _on_history_schema { super } end end # If adding a column to a temporal table, creates it in the table in - # the temporal schema and updates the view rules. + # the temporal schema and updates the triggers. # def add_column(table_name, *) return super unless is_chrono?(table_name) transaction do # Add the column to the temporal table _on_temporal_schema { super } - # Update the rules + # Update the triggers chrono_create_view_for(table_name) end end # If renaming a column of a temporal table, rename it in the table in - # the temporal schema and update the view rules. + # the temporal schema and update the triggers. # def rename_column(table_name, *) return super unless is_chrono?(table_name) # Rename the column in the temporal table and in the view transaction do _on_temporal_schema { super } super - # Update the rules + # Update the triggers chrono_create_view_for(table_name) end end # If removing a column from a temporal table, we are forced to drop the # view, then change the column from the table in the temporal schema and - # eventually recreate the rules. + # eventually recreate the triggers. # def change_column(table_name, *) return super unless is_chrono?(table_name) chrono_alter(table_name) { super } end @@ -239,11 +276,11 @@ _on_temporal_schema { super } end # If removing a column from a temporal table, we are forced to drop the # view, then drop the column from the table in the temporal schema and - # eventually recreate the rules. + # eventually recreate the triggers. # def remove_column(table_name, *) return super unless is_chrono?(table_name) chrono_alter(table_name) { super } end @@ -262,130 +299,83 @@ return super(table_name) unless is_chrono?(table_name) _on_temporal_schema(false) { super(table_name) } end end - # Create spatial indexes for timestamp search. Conceptually identical - # to the EXCLUDE constraint in chrono_create_history_for, but without - # the millisecond skew. + # Create spatial indexes for timestamp search. # # This index is used by +TimeMachine.at+, `.current` and `.past` to # build the temporal WHERE clauses that fetch the state of records at # a single point in time. # # Parameters: # # `table`: the table where to create indexes on - # `from` : the starting timestamp field - # `to` : the ending timestamp field + # `range`: the tsrange field # # Options: # # `:name`: the index name prefix, defaults to - # {table_name}_temporal_{snapshot / from_col / to_col} + # index_{table}_temporal_on_{range / lower_range / upper_range} # - def add_temporal_indexes(table, from, to, options = {}) - snapshot_idx, from_idx, to_idx = - temporal_index_names(table, from, to, options) + def add_temporal_indexes(table, range, options = {}) + range_idx, lower_idx, upper_idx = + temporal_index_names(table, range, options) chrono_alter_index(table, options) do execute <<-SQL - CREATE INDEX #{snapshot_idx} ON #{table} USING gist ( - box( - point( date_part( 'epoch', #{from} ), 0 ), - point( date_part( 'epoch', #{to } ), 0 ) - ) - ) + CREATE INDEX #{range_idx} ON #{table} USING gist ( #{range} ) SQL # Indexes used for precise history filtering, sorting and, in history - # tables, by UPDATE / DELETE rules + # tables, by UPDATE / DELETE triggers. # - execute "CREATE INDEX #{from_idx} ON #{table} ( #{from} )" - execute "CREATE INDEX #{to_idx } ON #{table} ( #{to } )" + execute "CREATE INDEX #{lower_idx} ON #{table} ( lower(#{range}) )" + execute "CREATE INDEX #{upper_idx} ON #{table} ( upper(#{range}) )" end end - def remove_temporal_indexes(table, from, to, options = {}) - indexes = temporal_index_names(table, from, to, options) + def remove_temporal_indexes(table, range, options = {}) + indexes = temporal_index_names(table, range, options) chrono_alter_index(table, options) do indexes.each {|idx| execute "DROP INDEX #{idx}" } end end - def temporal_index_names(table, from, to, options) - prefix = options[:name].presence || "#{table}_temporal" + def temporal_index_names(table, range, options = {}) + prefix = options[:name].presence || "index_#{table}_temporal" - ["#{from}_and_#{to}", from, to].map do |suffix| + # When creating computed indexes (e.g. ends_on::timestamp + time + # '23:59:59'), remove everything following the field name. + range = range.to_s.sub(/\W.*/, '') + + [range, "lower_#{range}", "upper_#{range}"].map do |suffix| [prefix, 'on', suffix].join('_') end end - # Adds a CHECK constraint to the given +table+, to assure that - # the value contained in the +from+ field is, by default, less - # than the value contained in the +to+ field. - # - # The default `<` operator can be changed using the `:op` option. - # - def add_from_before_to_constraint(table, from, to, options = {}) - operator = options[:op].presence || '<' - name = from_before_to_constraint_name(table, from, to) - - chrono_alter_constraint(table, options) do - execute <<-SQL - ALTER TABLE #{table} ADD CONSTRAINT - #{name} CHECK (#{from} #{operator} #{to}) - SQL - end - end - - def remove_from_before_to_constraint(table, from, to, options = {}) - name = from_before_to_constraint_name(table, from, to) - - chrono_alter_constraint(table, options) do - execute <<-SQL - ALTER TABLE #{table} DROP CONSTRAINT #{name} - SQL - end - end - - def from_before_to_constraint_name(table, from, to) - "#{table}_#{from}_before_#{to}" - end - - # Adds an EXCLUDE constraint to the given table, to assure that # no more than one record can occupy a definite segment on a # timeline. # - # The exclusion is implemented using a spatial gist index, that - # uses boxes under the hood. Different records are identified by - # default using the table's primary key - or you can specify your - # field (or composite field) using the `:id` option. - # - def add_timeline_consistency_constraint(table, from, to, options = {}) + def add_timeline_consistency_constraint(table, range, options = {}) name = timeline_consistency_constraint_name(table) - id = options[:id] || primary_key(table) + id = options[:id] || primary_key(table) chrono_alter_constraint(table, options) do execute <<-SQL - ALTER TABLE #{table} ADD CONSTRAINT - #{name} EXCLUDE USING gist ( - box( - point( date_part( 'epoch', #{from} ), #{id} ), - point( date_part( 'epoch', #{to } - INTERVAL '1 msec' ), #{id} ) - ) - WITH && - ) + ALTER TABLE #{table} ADD CONSTRAINT #{name} + EXCLUDE USING gist ( #{id} WITH =, #{range} WITH && ) SQL end end - def remove_timeline_consistency_constraint(table, from, to, options = {}) - name = timeline_consistency_constraint_name(table) + def remove_timeline_consistency_constraint(table, options = {}) + name = timeline_consistency_constraint_name(options[:prefix] || table) + chrono_alter_constraint(table, options) do execute <<-SQL ALTER TABLE #{table} DROP CONSTRAINT #{name} SQL end @@ -429,11 +419,11 @@ end end @_on_schema_nesting -= 1 end - TableCache = (Class.new(HashWithIndifferentAccess) do + TableCache = (Class.new(Hash) do def all ; keys; ; end def add! table ; self[table.to_s] = true ; end def del! table ; self[table.to_s] = nil ; end def fetch table ; self[table.to_s] ||= yield ; end end).new @@ -443,220 +433,364 @@ def is_chrono?(table) TableCache.fetch(table) do _on_temporal_schema { table_exists?(table) } && _on_history_schema { table_exists?(table) } end + + rescue ActiveRecord::StatementInvalid => e + # means that we could not change the search path to check for + # table existence + if is_exception_class?(e, PG::InvalidSchemaName, PG::InvalidParameterValue) + return false + else + raise e + end end - def chrono_create_schemas! - [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema| - execute "CREATE SCHEMA #{schema}" unless schema_exists?(schema) + def is_exception_class?(e, *klasses) + if e.respond_to?(:original_exception) + klasses.any? { |k| e.is_a?(k) } + else + klasses.any? { |k| e.message =~ /#{k.name}/ } end end - # Disable savepoints support, as they break history keeping. - # http://archives.postgresql.org/pgsql-hackers/2012-08/msg01094.php + # HACK: Redefine tsrange parsing support, as it is broken currently. # - def supports_savepoints? - false + # This self-made API is here because currently AR4 does not support + # open-ended ranges. The reasons are poor support in Ruby: + # + # https://bugs.ruby-lang.org/issues/6864 + # + # and an instable interface in Active Record: + # + # https://github.com/rails/rails/issues/13793 + # https://github.com/rails/rails/issues/14010 + # + # so, for now, we are implementing our own. + # + class TSRange < OID::Type + def extract_bounds(value) + from, to = value[1..-2].split(',') + { + from: (value[1] == ',' || from == '-infinity') ? nil : from[1..-2], + to: (value[-2] == ',' || to == 'infinity') ? nil : to[1..-2], + #exclude_start: (value[0] == '('), + #exclude_end: (value[-1] == ')') + } + end + + def type_cast(value) + extracted = extract_bounds(value) + + from = Conversions.string_to_utc_time extracted[:from] + to = Conversions.string_to_utc_time extracted[:to ] + + [from, to] + end end - def create_savepoint; end - def rollback_to_savepoint; end - def release_savepoint; end + def chrono_setup! + chrono_create_schemas + chrono_setup_type_map + chrono_upgrade_structure! + end + + # Copy the indexes from the temporal table to the history table if the indexes + # are not already created with the same name. + # + # Uniqueness is voluntarily ignored, as it doesn't make sense on history + # tables. + # + # Ref: GitHub pull #21. + # + def copy_indexes_to_history_for(table_name) + history_indexes = _on_history_schema { indexes(table_name) }.map(&:name) + temporal_indexes = _on_temporal_schema { indexes(table_name) } + + temporal_indexes.each do |index| + next if history_indexes.include?(index.name) + + _on_history_schema do + execute %[ + CREATE INDEX #{index.name} ON #{table_name} + USING #{index.using} ( #{index.columns.join(', ')} ) + ], 'Copy index from temporal to history' + end + end + end + private + # Create the temporal and history schemas, unless they already exist + # + def chrono_create_schemas + [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema| + execute "CREATE SCHEMA #{schema}" unless schema_exists?(schema) + end + end + + # Adds the above TSRange class to the PG Adapter OID::TYPE_MAP + # + def chrono_setup_type_map + OID::TYPE_MAP[3908] = TSRange.new + end + + # Upgrades existing structure for each table, if required. + # TODO: allow upgrades from pre-0.6 structure with box() and stuff. + # + def chrono_upgrade_structure! + transaction do + current = VERSION + + _on_temporal_schema { tables }.each do |table_name| + next unless is_chrono?(table_name) + metadata = chrono_metadata_for(table_name) + version = metadata['chronomodel'] + + if version.blank? # FIXME + raise Error, "ChronoModel found structures created by a too old version. Cannot upgrade right now." + end + + next if version == current + + logger.info "ChronoModel: upgrading #{table_name} from #{version} to #{current}" + chrono_create_view_for(table_name) + logger.info "ChronoModel: upgrade complete" + end + end + end + + def chrono_metadata_for(table) + comment = select_value( + "SELECT obj_description(#{quote(table)}::regclass)", + "ChronoModel metadata for #{table}") if table_exists?(table) + + MultiJson.load(comment || '{}').with_indifferent_access + end + + def chrono_metadata_set(table, metadata) + comment = MultiJson.dump(metadata) + + execute %[ + COMMENT ON VIEW #{table} IS #{quote(comment)} + ] + end + + def add_history_validity_constraint(table, pkey) + add_timeline_consistency_constraint(table, :validity, :id => pkey, :on_current_schema => true) + end + + def remove_history_validity_constraint(table, options = {}) + remove_timeline_consistency_constraint(table, options.merge(:on_current_schema => true)) + end + # Create the history table in the history schema def chrono_create_history_for(table) parent = "#{TEMPORAL_SCHEMA}.#{table}" p_pkey = primary_key(parent) execute <<-SQL CREATE TABLE #{table} ( hid SERIAL PRIMARY KEY, - valid_from timestamp NOT NULL, - valid_to timestamp NOT NULL DEFAULT '9999-12-31', + validity #{RANGE_TYPE} NOT NULL, recorded_at timestamp NOT NULL DEFAULT timezone('UTC', now()) ) INHERITS ( #{parent} ) SQL - add_from_before_to_constraint(table, :valid_from, :valid_to, - :on_current_schema => true) + add_history_validity_constraint(table, p_pkey) - add_timeline_consistency_constraint(table, :valid_from, :valid_to, - :id => p_pkey, :on_current_schema => true) - - # Inherited primary key - execute "CREATE INDEX #{table}_inherit_pkey ON #{table} ( #{p_pkey} )" - chrono_create_history_indexes_for(table, p_pkey) end def chrono_create_history_indexes_for(table, p_pkey = nil) # Duplicate because of Migrate.upgrade_indexes_for # TODO remove me. p_pkey ||= primary_key("#{TEMPORAL_SCHEMA}.#{table}") - add_temporal_indexes table, :valid_from, :valid_to, - :on_current_schema => true + add_temporal_indexes table, :validity, :on_current_schema => true + execute "CREATE INDEX #{table}_inherit_pkey ON #{table} ( #{p_pkey} )" execute "CREATE INDEX #{table}_recorded_at ON #{table} ( recorded_at )" execute "CREATE INDEX #{table}_instance_history ON #{table} ( #{p_pkey}, recorded_at )" end - # Create the public view and its rewrite rules + # Create the public view and its INSTEAD OF triggers # - def chrono_create_view_for(table) + def chrono_create_view_for(table, options = nil) pk = primary_key(table) current = [TEMPORAL_SCHEMA, table].join('.') history = [HISTORY_SCHEMA, table].join('.') + seq = serial_sequence(current, pk) + options ||= chrono_metadata_for(table) + # SELECT - return only current data # execute "DROP VIEW #{table}" if table_exists? table - execute "CREATE VIEW #{table} AS SELECT *, xmin AS __xid FROM ONLY #{current}" + execute "CREATE VIEW #{table} AS SELECT * FROM ONLY #{current}" - columns = columns(table).map{|c| quote_column_name(c.name)} + # Set default values on the view (closes #12) + # + chrono_metadata_set(table, options.merge(:chronomodel => VERSION)) + + columns(table).each do |column| + default = column.default.nil? ? column.default_function : quote(column.default, column) + next if column.name == pk || default.nil? + + execute "ALTER VIEW #{table} ALTER COLUMN #{column.name} SET DEFAULT #{default}" + end + + columns = columns(table).map {|c| quote_column_name(c.name)} columns.delete(quote_column_name(pk)) - updates = columns.map {|c| "#{c} = new.#{c}"}.join(",\n") + fields, values = columns.join(', '), columns.map {|c| "NEW.#{c}"}.join(', ') - fields, values = columns.join(', '), columns.map {|c| "new.#{c}"}.join(', ') - fields_with_pk, values_with_pk = "#{pk}, " << fields, "new.#{pk}, " << values + # Columns to be journaled. By default everything except updated_at (GH #7) + # + journal = if options[:journal] + options[:journal].map {|col| quote_column_name(col)} + elsif options[:no_journal] + columns - options[:no_journal].map {|col| quote_column_name(col)} + + elsif options[:full_journal] + columns + + else + columns - [ quote_column_name('updated_at') ] + end + + journal &= columns + # INSERT - insert data both in the temporal table and in the history one. # - # A trigger is required if there is a serial ID column, as rules by - # design cannot handle the following case: + # The serial sequence is invoked manually only if the PK is NULL, to + # allow setting the PK to a specific value (think migration scenario). # - # * INSERT INTO ... SELECT: if using currval(), all the rows - # inserted in the history will have the same identity value; - # - # * if using a separate sequence to solve the above case, it may go - # out of sync with the main one if an INSERT statement fails due - # to a table constraint (the first one is nextval()'ed but the - # nextval() on the history one never happens) - # - # So, only for this case, we resort to an AFTER INSERT FOR EACH ROW trigger. - # - # Ref: GH Issue #4. - # - if serial_sequence(current, pk).present? - execute <<-SQL - CREATE RULE #{table}_ins AS ON INSERT TO #{table} DO INSTEAD ( - INSERT INTO #{current} ( #{fields} ) VALUES ( #{values} ) - RETURNING #{pk}, #{fields}, xmin - ); + execute <<-SQL + CREATE OR REPLACE FUNCTION chronomodel_#{table}_insert() RETURNS TRIGGER AS $$ + BEGIN + IF NEW.#{pk} IS NULL THEN + NEW.#{pk} := nextval('#{seq}'); + END IF; - CREATE OR REPLACE FUNCTION #{current}_ins() RETURNS TRIGGER AS $$ - BEGIN - INSERT INTO #{history} ( #{fields_with_pk}, valid_from ) - VALUES ( #{values_with_pk}, timezone('UTC', now()) ); - RETURN NULL; - END; - $$ LANGUAGE plpgsql; + INSERT INTO #{current} ( #{pk}, #{fields} ) + VALUES ( NEW.#{pk}, #{values} ); - DROP TRIGGER IF EXISTS history_ins ON #{current}; + INSERT INTO #{history} ( #{pk}, #{fields}, validity ) + VALUES ( NEW.#{pk}, #{values}, tsrange(timezone('UTC', now()), NULL) ); - CREATE TRIGGER history_ins AFTER INSERT ON #{current} - FOR EACH ROW EXECUTE PROCEDURE #{current}_ins(); - SQL - else - execute <<-SQL - CREATE RULE #{table}_ins AS ON INSERT TO #{table} DO INSTEAD ( + RETURN NEW; + END; + $$ LANGUAGE plpgsql; - INSERT INTO #{current} ( #{fields_with_pk} ) VALUES ( #{values_with_pk} ); + DROP TRIGGER IF EXISTS chronomodel_insert ON #{table}; - INSERT INTO #{history} ( #{fields_with_pk}, valid_from ) - VALUES ( #{values_with_pk}, timezone('UTC', now()) ) - RETURNING #{fields_with_pk}, xmin - ) - SQL - end + CREATE TRIGGER chronomodel_insert INSTEAD OF INSERT ON #{table} + FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_insert(); + SQL # UPDATE - set the last history entry validity to now, save the current data # in a new history entry and update the temporal table with the new data. - - # If this is the first statement of a transaction, inferred by the last - # transaction ID that updated the row, create a new row in the history. # - # The current transaction ID is returned by txid_current() as a 64-bit - # signed integer, while the last transaction ID that changed a row is - # stored into a 32-bit unsigned integer in the __xid column. As XIDs - # wrap over time, txid_current() adds an "epoch" counter in the most - # significant bits (http://bit.ly/W2Srt7) of the int - thus here we - # remove it by and'ing with 2^32-1. + # If there are no changes, this trigger suppresses redundant updates. # - # XID are 32-bit unsigned integers, and by design cannot be casted nor - # compared to anything else, adding a CAST or an operator requires - # super-user privileges, so here we do a double-cast from varchar to - # int8, to finally compare it with the current XID. We're using 64bit - # integers as in PG there is no 32-bit unsigned data type. + # If a row in the history with the current ID and current timestamp already + # exists, update it with new data. This logic makes possible to "squash" + # together changes made in a transaction in a single history row. # execute <<-SQL - CREATE RULE #{table}_upd_first AS ON UPDATE TO #{table} - WHERE old.__xid::char(10)::int8 <> (txid_current() & (2^32-1)::int8) - DO INSTEAD ( + CREATE OR REPLACE FUNCTION chronomodel_#{table}_update() RETURNS TRIGGER AS $$ + DECLARE _now timestamp; + DECLARE _hid integer; + DECLARE _old record; + DECLARE _new record; + BEGIN + IF OLD IS NOT DISTINCT FROM NEW THEN + RETURN NULL; + END IF; - UPDATE #{history} SET valid_to = timezone('UTC', now()) - WHERE #{pk} = old.#{pk} AND valid_to = '9999-12-31'; + _old := row(#{journal.map {|c| "OLD.#{c}" }.join(', ')}); + _new := row(#{journal.map {|c| "NEW.#{c}" }.join(', ')}); - INSERT INTO #{history} ( #{pk}, #{fields}, valid_from ) - VALUES ( old.#{pk}, #{values}, timezone('UTC', now()) ); + IF _old IS NOT DISTINCT FROM _new THEN + UPDATE ONLY #{current} SET ( #{fields} ) = ( #{values} ) WHERE #{pk} = OLD.#{pk}; + RETURN NEW; + END IF; - UPDATE ONLY #{current} SET #{updates} - WHERE #{pk} = old.#{pk} - ) - SQL + _now := timezone('UTC', now()); + _hid := NULL; - # Else, update the already present history row with new data. This logic - # makes possible to "squash" together changes made in a transaction in a - # single history row, assuring timestamps consistency. - # - execute <<-SQL - CREATE RULE #{table}_upd_next AS ON UPDATE TO #{table} DO INSTEAD ( - UPDATE #{history} SET #{updates} - WHERE #{pk} = old.#{pk} AND valid_from = timezone('UTC', now()); + SELECT hid INTO _hid FROM #{history} WHERE #{pk} = OLD.#{pk} AND lower(validity) = _now; - UPDATE ONLY #{current} SET #{updates} - WHERE #{pk} = old.#{pk} - ) + IF _hid IS NOT NULL THEN + UPDATE #{history} SET ( #{fields} ) = ( #{values} ) WHERE hid = _hid; + ELSE + UPDATE #{history} SET validity = tsrange(lower(validity), _now) + WHERE #{pk} = OLD.#{pk} AND upper_inf(validity); + + INSERT INTO #{history} ( #{pk}, #{fields}, validity ) + VALUES ( OLD.#{pk}, #{values}, tsrange(_now, NULL) ); + END IF; + + UPDATE ONLY #{current} SET ( #{fields} ) = ( #{values} ) WHERE #{pk} = OLD.#{pk}; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + DROP TRIGGER IF EXISTS chronomodel_update ON #{table}; + + CREATE TRIGGER chronomodel_update INSTEAD OF UPDATE ON #{table} + FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_update(); SQL # DELETE - save the current data in the history and eventually delete the # data from the temporal table. # The first DELETE is required to remove history for records INSERTed and # DELETEd in the same transaction. # execute <<-SQL - CREATE RULE #{table}_del AS ON DELETE TO #{table} DO INSTEAD ( + CREATE OR REPLACE FUNCTION chronomodel_#{table}_delete() RETURNS TRIGGER AS $$ + DECLARE _now timestamp; + BEGIN + _now := timezone('UTC', now()); - DELETE FROM #{history} - WHERE #{pk} = old.#{pk} - AND valid_from = timezone('UTC', now()) - AND valid_to = '9999-12-31'; + DELETE FROM #{history} + WHERE #{pk} = old.#{pk} AND validity = tsrange(_now, NULL); - UPDATE #{history} SET valid_to = timezone('UTC', now()) - WHERE #{pk} = old.#{pk} AND valid_to = '9999-12-31'; + UPDATE #{history} SET validity = tsrange(lower(validity), _now) + WHERE #{pk} = old.#{pk} AND upper_inf(validity); - DELETE FROM ONLY #{current} - WHERE #{current}.#{pk} = old.#{pk} - ) + DELETE FROM ONLY #{current} + WHERE #{pk} = old.#{pk}; + + RETURN OLD; + END; + $$ LANGUAGE plpgsql; + + DROP TRIGGER IF EXISTS chronomodel_delete ON #{table}; + + CREATE TRIGGER chronomodel_delete INSTEAD OF DELETE ON #{table} + FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_delete(); SQL end # In destructive changes, such as removing columns or changing column # types, the view must be dropped and recreated, while the change has # to be applied to the table in the temporal schema. # def chrono_alter(table_name) transaction do + options = chrono_metadata_for(table_name) + execute "DROP VIEW #{table_name}" _on_temporal_schema { yield } - # Recreate the rules - chrono_create_view_for(table_name) + # Recreate the triggers + chrono_create_view_for(table_name, options) end end # Generic alteration of history tables, where changes have to be # propagated both on the temporal table and the history one.