lib/chrono_model/adapter.rb in chrono_model-0.5.3 vs lib/chrono_model/adapter.rb in chrono_model-0.8.0
- old
+ new
@@ -12,14 +12,19 @@
TEMPORAL_SCHEMA = 'temporal'
# The schema holding historical data
HISTORY_SCHEMA = 'history'
- # Chronomodel is supported starting with PostgreSQL >= 9.0
+ # This is the data type used for the SCD2 validity
+ RANGE_TYPE = 'tsrange'
+
+ # Returns true whether the connection adapter supports our
+ # implementation of temporal tables. Currently, Chronomodel
+ # is supported starting with PostgreSQL 9.3.
#
def chrono_supported?
- postgresql_version >= 90000
+ postgresql_version >= 90300
end
# Creates the given table, possibly creating the temporal schema
# objects if the `:temporal` option is given and set to true.
#
@@ -32,18 +37,15 @@
logger.warn "WARNING - Creating a \"__chrono_id\" primary key to fulfill the requirement"
options[:id] = '__chrono_id'
end
- # Create required schemas
- chrono_create_schemas!
-
transaction do
_on_temporal_schema { super }
_on_history_schema { chrono_create_history_for(table_name) }
- chrono_create_view_for(table_name)
+ chrono_create_view_for(table_name, options)
TableCache.add! table_name
end
end
@@ -53,20 +55,49 @@
return super unless is_chrono?(name)
clear_cache!
transaction do
+ # Rename tables
+ #
[TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema|
on_schema(schema) do
seq = serial_sequence(name, primary_key(name))
new_seq = seq.sub(name.to_s, new_name.to_s).split('.').last
execute "ALTER SEQUENCE #{seq} RENAME TO #{new_seq}"
execute "ALTER TABLE #{name} RENAME TO #{new_name}"
end
end
+ # Rename indexes
+ #
+ pkey = primary_key(new_name)
+ _on_history_schema do
+ standard_index_names = %w(
+ inherit_pkey instance_history pkey
+ recorded_at timeline_consistency )
+
+ old_names = temporal_index_names(name, :validity) +
+ standard_index_names.map {|i| [name, i].join('_') }
+
+ new_names = temporal_index_names(new_name, :validity) +
+ standard_index_names.map {|i| [new_name, i].join('_') }
+
+ old_names.zip(new_names).each do |old, new|
+ execute "ALTER INDEX #{old} RENAME TO #{new}"
+ end
+ end
+
+ # Rename functions
+ #
+ %w( insert update delete ).each do |func|
+ execute "ALTER FUNCTION chronomodel_#{name}_#{func}() RENAME TO chronomodel_#{new_name}_#{func}"
+ end
+
+ # Rename the public view
+ #
execute "ALTER VIEW #{name} RENAME TO #{new_name}"
TableCache.del! name
TableCache.add! new_name
end
@@ -94,11 +125,12 @@
execute "ALTER TABLE #{table_name} ADD __chrono_id SERIAL PRIMARY KEY"
end
execute "ALTER TABLE #{table_name} SET SCHEMA #{TEMPORAL_SCHEMA}"
_on_history_schema { chrono_create_history_for(table_name) }
- chrono_create_view_for(table_name)
+ chrono_create_view_for(table_name, options)
+ copy_indexes_to_history_for(table_name)
TableCache.add! table_name
# Optionally copy the plain table data, setting up history
# retroactively.
@@ -108,14 +140,13 @@
from = options[:validity] || '0001-01-01 00:00:00'
execute %[
INSERT INTO #{HISTORY_SCHEMA}.#{table_name}
SELECT *,
- nextval('#{seq}') AS hid,
- timestamp '#{from}' AS valid_from,
- timestamp '9999-12-31 00:00:00' AS valid_to,
- timezone('UTC', now()) AS recorded_at
+ nextval('#{seq}') AS hid,
+ tsrange('#{from}', NULL) AS validity,
+ timezone('UTC', now()) AS recorded_at
FROM #{TEMPORAL_SCHEMA}.#{table_name}
]
end
end
@@ -123,11 +154,17 @@
else
if options[:temporal] == false && is_chrono?(table_name)
# Remove temporal features from this table
#
execute "DROP VIEW #{table_name}"
- _on_temporal_schema { execute "DROP FUNCTION IF EXISTS #{table_name}_ins() CASCADE" }
+
+ _on_temporal_schema do
+ %w( insert update delete ).each do |func|
+ execute "DROP FUNCTION IF EXISTS #{table_name}_#{func}() CASCADE"
+ end
+ end
+
_on_history_schema { execute "DROP TABLE #{table_name}" }
default_schema = select_value 'SELECT current_schema()'
_on_temporal_schema do
if primary_key(table_name) == '__chrono_id'
@@ -144,11 +181,11 @@
end
end
end
# If dropping a temporal table, drops it from the temporal schema
- # adding the CASCADE option so to delete the history, view and rules.
+ # adding the CASCADE option so to delete the history, view and triggers.
#
def drop_table(table_name, *)
return super unless is_chrono?(table_name)
_on_temporal_schema { execute "DROP TABLE #{table_name} CASCADE" }
@@ -184,43 +221,43 @@
_on_history_schema { super }
end
end
# If adding a column to a temporal table, creates it in the table in
- # the temporal schema and updates the view rules.
+ # the temporal schema and updates the triggers.
#
def add_column(table_name, *)
return super unless is_chrono?(table_name)
transaction do
# Add the column to the temporal table
_on_temporal_schema { super }
- # Update the rules
+ # Update the triggers
chrono_create_view_for(table_name)
end
end
# If renaming a column of a temporal table, rename it in the table in
- # the temporal schema and update the view rules.
+ # the temporal schema and update the triggers.
#
def rename_column(table_name, *)
return super unless is_chrono?(table_name)
# Rename the column in the temporal table and in the view
transaction do
_on_temporal_schema { super }
super
- # Update the rules
+ # Update the triggers
chrono_create_view_for(table_name)
end
end
# If removing a column from a temporal table, we are forced to drop the
# view, then change the column from the table in the temporal schema and
- # eventually recreate the rules.
+ # eventually recreate the triggers.
#
def change_column(table_name, *)
return super unless is_chrono?(table_name)
chrono_alter(table_name) { super }
end
@@ -239,11 +276,11 @@
_on_temporal_schema { super }
end
# If removing a column from a temporal table, we are forced to drop the
# view, then drop the column from the table in the temporal schema and
- # eventually recreate the rules.
+ # eventually recreate the triggers.
#
def remove_column(table_name, *)
return super unless is_chrono?(table_name)
chrono_alter(table_name) { super }
end
@@ -262,130 +299,83 @@
return super(table_name) unless is_chrono?(table_name)
_on_temporal_schema(false) { super(table_name) }
end
end
- # Create spatial indexes for timestamp search. Conceptually identical
- # to the EXCLUDE constraint in chrono_create_history_for, but without
- # the millisecond skew.
+ # Create spatial indexes for timestamp search.
#
# This index is used by +TimeMachine.at+, `.current` and `.past` to
# build the temporal WHERE clauses that fetch the state of records at
# a single point in time.
#
# Parameters:
#
# `table`: the table where to create indexes on
- # `from` : the starting timestamp field
- # `to` : the ending timestamp field
+ # `range`: the tsrange field
#
# Options:
#
# `:name`: the index name prefix, defaults to
- # {table_name}_temporal_{snapshot / from_col / to_col}
+ # index_{table}_temporal_on_{range / lower_range / upper_range}
#
- def add_temporal_indexes(table, from, to, options = {})
- snapshot_idx, from_idx, to_idx =
- temporal_index_names(table, from, to, options)
+ def add_temporal_indexes(table, range, options = {})
+ range_idx, lower_idx, upper_idx =
+ temporal_index_names(table, range, options)
chrono_alter_index(table, options) do
execute <<-SQL
- CREATE INDEX #{snapshot_idx} ON #{table} USING gist (
- box(
- point( date_part( 'epoch', #{from} ), 0 ),
- point( date_part( 'epoch', #{to } ), 0 )
- )
- )
+ CREATE INDEX #{range_idx} ON #{table} USING gist ( #{range} )
SQL
# Indexes used for precise history filtering, sorting and, in history
- # tables, by UPDATE / DELETE rules
+ # tables, by UPDATE / DELETE triggers.
#
- execute "CREATE INDEX #{from_idx} ON #{table} ( #{from} )"
- execute "CREATE INDEX #{to_idx } ON #{table} ( #{to } )"
+ execute "CREATE INDEX #{lower_idx} ON #{table} ( lower(#{range}) )"
+ execute "CREATE INDEX #{upper_idx} ON #{table} ( upper(#{range}) )"
end
end
- def remove_temporal_indexes(table, from, to, options = {})
- indexes = temporal_index_names(table, from, to, options)
+ def remove_temporal_indexes(table, range, options = {})
+ indexes = temporal_index_names(table, range, options)
chrono_alter_index(table, options) do
indexes.each {|idx| execute "DROP INDEX #{idx}" }
end
end
- def temporal_index_names(table, from, to, options)
- prefix = options[:name].presence || "#{table}_temporal"
+ def temporal_index_names(table, range, options = {})
+ prefix = options[:name].presence || "index_#{table}_temporal"
- ["#{from}_and_#{to}", from, to].map do |suffix|
+ # When creating computed indexes (e.g. ends_on::timestamp + time
+ # '23:59:59'), remove everything following the field name.
+ range = range.to_s.sub(/\W.*/, '')
+
+ [range, "lower_#{range}", "upper_#{range}"].map do |suffix|
[prefix, 'on', suffix].join('_')
end
end
- # Adds a CHECK constraint to the given +table+, to assure that
- # the value contained in the +from+ field is, by default, less
- # than the value contained in the +to+ field.
- #
- # The default `<` operator can be changed using the `:op` option.
- #
- def add_from_before_to_constraint(table, from, to, options = {})
- operator = options[:op].presence || '<'
- name = from_before_to_constraint_name(table, from, to)
-
- chrono_alter_constraint(table, options) do
- execute <<-SQL
- ALTER TABLE #{table} ADD CONSTRAINT
- #{name} CHECK (#{from} #{operator} #{to})
- SQL
- end
- end
-
- def remove_from_before_to_constraint(table, from, to, options = {})
- name = from_before_to_constraint_name(table, from, to)
-
- chrono_alter_constraint(table, options) do
- execute <<-SQL
- ALTER TABLE #{table} DROP CONSTRAINT #{name}
- SQL
- end
- end
-
- def from_before_to_constraint_name(table, from, to)
- "#{table}_#{from}_before_#{to}"
- end
-
-
# Adds an EXCLUDE constraint to the given table, to assure that
# no more than one record can occupy a definite segment on a
# timeline.
#
- # The exclusion is implemented using a spatial gist index, that
- # uses boxes under the hood. Different records are identified by
- # default using the table's primary key - or you can specify your
- # field (or composite field) using the `:id` option.
- #
- def add_timeline_consistency_constraint(table, from, to, options = {})
+ def add_timeline_consistency_constraint(table, range, options = {})
name = timeline_consistency_constraint_name(table)
- id = options[:id] || primary_key(table)
+ id = options[:id] || primary_key(table)
chrono_alter_constraint(table, options) do
execute <<-SQL
- ALTER TABLE #{table} ADD CONSTRAINT
- #{name} EXCLUDE USING gist (
- box(
- point( date_part( 'epoch', #{from} ), #{id} ),
- point( date_part( 'epoch', #{to } - INTERVAL '1 msec' ), #{id} )
- )
- WITH &&
- )
+ ALTER TABLE #{table} ADD CONSTRAINT #{name}
+ EXCLUDE USING gist ( #{id} WITH =, #{range} WITH && )
SQL
end
end
- def remove_timeline_consistency_constraint(table, from, to, options = {})
- name = timeline_consistency_constraint_name(table)
+ def remove_timeline_consistency_constraint(table, options = {})
+ name = timeline_consistency_constraint_name(options[:prefix] || table)
+
chrono_alter_constraint(table, options) do
execute <<-SQL
ALTER TABLE #{table} DROP CONSTRAINT #{name}
SQL
end
@@ -429,11 +419,11 @@
end
end
@_on_schema_nesting -= 1
end
- TableCache = (Class.new(HashWithIndifferentAccess) do
+ TableCache = (Class.new(Hash) do
def all ; keys; ; end
def add! table ; self[table.to_s] = true ; end
def del! table ; self[table.to_s] = nil ; end
def fetch table ; self[table.to_s] ||= yield ; end
end).new
@@ -443,220 +433,364 @@
def is_chrono?(table)
TableCache.fetch(table) do
_on_temporal_schema { table_exists?(table) } &&
_on_history_schema { table_exists?(table) }
end
+
+ rescue ActiveRecord::StatementInvalid => e
+ # means that we could not change the search path to check for
+ # table existence
+ if is_exception_class?(e, PG::InvalidSchemaName, PG::InvalidParameterValue)
+ return false
+ else
+ raise e
+ end
end
- def chrono_create_schemas!
- [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema|
- execute "CREATE SCHEMA #{schema}" unless schema_exists?(schema)
+ def is_exception_class?(e, *klasses)
+ if e.respond_to?(:original_exception)
+ klasses.any? { |k| e.is_a?(k) }
+ else
+ klasses.any? { |k| e.message =~ /#{k.name}/ }
end
end
- # Disable savepoints support, as they break history keeping.
- # http://archives.postgresql.org/pgsql-hackers/2012-08/msg01094.php
+ # HACK: Redefine tsrange parsing support, as it is broken currently.
#
- def supports_savepoints?
- false
+ # This self-made API is here because currently AR4 does not support
+ # open-ended ranges. The reasons are poor support in Ruby:
+ #
+ # https://bugs.ruby-lang.org/issues/6864
+ #
+ # and an instable interface in Active Record:
+ #
+ # https://github.com/rails/rails/issues/13793
+ # https://github.com/rails/rails/issues/14010
+ #
+ # so, for now, we are implementing our own.
+ #
+ class TSRange < OID::Type
+ def extract_bounds(value)
+ from, to = value[1..-2].split(',')
+ {
+ from: (value[1] == ',' || from == '-infinity') ? nil : from[1..-2],
+ to: (value[-2] == ',' || to == 'infinity') ? nil : to[1..-2],
+ #exclude_start: (value[0] == '('),
+ #exclude_end: (value[-1] == ')')
+ }
+ end
+
+ def type_cast(value)
+ extracted = extract_bounds(value)
+
+ from = Conversions.string_to_utc_time extracted[:from]
+ to = Conversions.string_to_utc_time extracted[:to ]
+
+ [from, to]
+ end
end
- def create_savepoint; end
- def rollback_to_savepoint; end
- def release_savepoint; end
+ def chrono_setup!
+ chrono_create_schemas
+ chrono_setup_type_map
+ chrono_upgrade_structure!
+ end
+
+ # Copy the indexes from the temporal table to the history table if the indexes
+ # are not already created with the same name.
+ #
+ # Uniqueness is voluntarily ignored, as it doesn't make sense on history
+ # tables.
+ #
+ # Ref: GitHub pull #21.
+ #
+ def copy_indexes_to_history_for(table_name)
+ history_indexes = _on_history_schema { indexes(table_name) }.map(&:name)
+ temporal_indexes = _on_temporal_schema { indexes(table_name) }
+
+ temporal_indexes.each do |index|
+ next if history_indexes.include?(index.name)
+
+ _on_history_schema do
+ execute %[
+ CREATE INDEX #{index.name} ON #{table_name}
+ USING #{index.using} ( #{index.columns.join(', ')} )
+ ], 'Copy index from temporal to history'
+ end
+ end
+ end
+
private
+ # Create the temporal and history schemas, unless they already exist
+ #
+ def chrono_create_schemas
+ [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema|
+ execute "CREATE SCHEMA #{schema}" unless schema_exists?(schema)
+ end
+ end
+
+ # Adds the above TSRange class to the PG Adapter OID::TYPE_MAP
+ #
+ def chrono_setup_type_map
+ OID::TYPE_MAP[3908] = TSRange.new
+ end
+
+ # Upgrades existing structure for each table, if required.
+ # TODO: allow upgrades from pre-0.6 structure with box() and stuff.
+ #
+ def chrono_upgrade_structure!
+ transaction do
+ current = VERSION
+
+ _on_temporal_schema { tables }.each do |table_name|
+ next unless is_chrono?(table_name)
+ metadata = chrono_metadata_for(table_name)
+ version = metadata['chronomodel']
+
+ if version.blank? # FIXME
+ raise Error, "ChronoModel found structures created by a too old version. Cannot upgrade right now."
+ end
+
+ next if version == current
+
+ logger.info "ChronoModel: upgrading #{table_name} from #{version} to #{current}"
+ chrono_create_view_for(table_name)
+ logger.info "ChronoModel: upgrade complete"
+ end
+ end
+ end
+
+ def chrono_metadata_for(table)
+ comment = select_value(
+ "SELECT obj_description(#{quote(table)}::regclass)",
+ "ChronoModel metadata for #{table}") if table_exists?(table)
+
+ MultiJson.load(comment || '{}').with_indifferent_access
+ end
+
+ def chrono_metadata_set(table, metadata)
+ comment = MultiJson.dump(metadata)
+
+ execute %[
+ COMMENT ON VIEW #{table} IS #{quote(comment)}
+ ]
+ end
+
+ def add_history_validity_constraint(table, pkey)
+ add_timeline_consistency_constraint(table, :validity, :id => pkey, :on_current_schema => true)
+ end
+
+ def remove_history_validity_constraint(table, options = {})
+ remove_timeline_consistency_constraint(table, options.merge(:on_current_schema => true))
+ end
+
# Create the history table in the history schema
def chrono_create_history_for(table)
parent = "#{TEMPORAL_SCHEMA}.#{table}"
p_pkey = primary_key(parent)
execute <<-SQL
CREATE TABLE #{table} (
hid SERIAL PRIMARY KEY,
- valid_from timestamp NOT NULL,
- valid_to timestamp NOT NULL DEFAULT '9999-12-31',
+ validity #{RANGE_TYPE} NOT NULL,
recorded_at timestamp NOT NULL DEFAULT timezone('UTC', now())
) INHERITS ( #{parent} )
SQL
- add_from_before_to_constraint(table, :valid_from, :valid_to,
- :on_current_schema => true)
+ add_history_validity_constraint(table, p_pkey)
- add_timeline_consistency_constraint(table, :valid_from, :valid_to,
- :id => p_pkey, :on_current_schema => true)
-
- # Inherited primary key
- execute "CREATE INDEX #{table}_inherit_pkey ON #{table} ( #{p_pkey} )"
-
chrono_create_history_indexes_for(table, p_pkey)
end
def chrono_create_history_indexes_for(table, p_pkey = nil)
# Duplicate because of Migrate.upgrade_indexes_for
# TODO remove me.
p_pkey ||= primary_key("#{TEMPORAL_SCHEMA}.#{table}")
- add_temporal_indexes table, :valid_from, :valid_to,
- :on_current_schema => true
+ add_temporal_indexes table, :validity, :on_current_schema => true
+ execute "CREATE INDEX #{table}_inherit_pkey ON #{table} ( #{p_pkey} )"
execute "CREATE INDEX #{table}_recorded_at ON #{table} ( recorded_at )"
execute "CREATE INDEX #{table}_instance_history ON #{table} ( #{p_pkey}, recorded_at )"
end
- # Create the public view and its rewrite rules
+ # Create the public view and its INSTEAD OF triggers
#
- def chrono_create_view_for(table)
+ def chrono_create_view_for(table, options = nil)
pk = primary_key(table)
current = [TEMPORAL_SCHEMA, table].join('.')
history = [HISTORY_SCHEMA, table].join('.')
+ seq = serial_sequence(current, pk)
+ options ||= chrono_metadata_for(table)
+
# SELECT - return only current data
#
execute "DROP VIEW #{table}" if table_exists? table
- execute "CREATE VIEW #{table} AS SELECT *, xmin AS __xid FROM ONLY #{current}"
+ execute "CREATE VIEW #{table} AS SELECT * FROM ONLY #{current}"
- columns = columns(table).map{|c| quote_column_name(c.name)}
+ # Set default values on the view (closes #12)
+ #
+ chrono_metadata_set(table, options.merge(:chronomodel => VERSION))
+
+ columns(table).each do |column|
+ default = column.default.nil? ? column.default_function : quote(column.default, column)
+ next if column.name == pk || default.nil?
+
+ execute "ALTER VIEW #{table} ALTER COLUMN #{column.name} SET DEFAULT #{default}"
+ end
+
+ columns = columns(table).map {|c| quote_column_name(c.name)}
columns.delete(quote_column_name(pk))
- updates = columns.map {|c| "#{c} = new.#{c}"}.join(",\n")
+ fields, values = columns.join(', '), columns.map {|c| "NEW.#{c}"}.join(', ')
- fields, values = columns.join(', '), columns.map {|c| "new.#{c}"}.join(', ')
- fields_with_pk, values_with_pk = "#{pk}, " << fields, "new.#{pk}, " << values
+ # Columns to be journaled. By default everything except updated_at (GH #7)
+ #
+ journal = if options[:journal]
+ options[:journal].map {|col| quote_column_name(col)}
+ elsif options[:no_journal]
+ columns - options[:no_journal].map {|col| quote_column_name(col)}
+
+ elsif options[:full_journal]
+ columns
+
+ else
+ columns - [ quote_column_name('updated_at') ]
+ end
+
+ journal &= columns
+
# INSERT - insert data both in the temporal table and in the history one.
#
- # A trigger is required if there is a serial ID column, as rules by
- # design cannot handle the following case:
+ # The serial sequence is invoked manually only if the PK is NULL, to
+ # allow setting the PK to a specific value (think migration scenario).
#
- # * INSERT INTO ... SELECT: if using currval(), all the rows
- # inserted in the history will have the same identity value;
- #
- # * if using a separate sequence to solve the above case, it may go
- # out of sync with the main one if an INSERT statement fails due
- # to a table constraint (the first one is nextval()'ed but the
- # nextval() on the history one never happens)
- #
- # So, only for this case, we resort to an AFTER INSERT FOR EACH ROW trigger.
- #
- # Ref: GH Issue #4.
- #
- if serial_sequence(current, pk).present?
- execute <<-SQL
- CREATE RULE #{table}_ins AS ON INSERT TO #{table} DO INSTEAD (
- INSERT INTO #{current} ( #{fields} ) VALUES ( #{values} )
- RETURNING #{pk}, #{fields}, xmin
- );
+ execute <<-SQL
+ CREATE OR REPLACE FUNCTION chronomodel_#{table}_insert() RETURNS TRIGGER AS $$
+ BEGIN
+ IF NEW.#{pk} IS NULL THEN
+ NEW.#{pk} := nextval('#{seq}');
+ END IF;
- CREATE OR REPLACE FUNCTION #{current}_ins() RETURNS TRIGGER AS $$
- BEGIN
- INSERT INTO #{history} ( #{fields_with_pk}, valid_from )
- VALUES ( #{values_with_pk}, timezone('UTC', now()) );
- RETURN NULL;
- END;
- $$ LANGUAGE plpgsql;
+ INSERT INTO #{current} ( #{pk}, #{fields} )
+ VALUES ( NEW.#{pk}, #{values} );
- DROP TRIGGER IF EXISTS history_ins ON #{current};
+ INSERT INTO #{history} ( #{pk}, #{fields}, validity )
+ VALUES ( NEW.#{pk}, #{values}, tsrange(timezone('UTC', now()), NULL) );
- CREATE TRIGGER history_ins AFTER INSERT ON #{current}
- FOR EACH ROW EXECUTE PROCEDURE #{current}_ins();
- SQL
- else
- execute <<-SQL
- CREATE RULE #{table}_ins AS ON INSERT TO #{table} DO INSTEAD (
+ RETURN NEW;
+ END;
+ $$ LANGUAGE plpgsql;
- INSERT INTO #{current} ( #{fields_with_pk} ) VALUES ( #{values_with_pk} );
+ DROP TRIGGER IF EXISTS chronomodel_insert ON #{table};
- INSERT INTO #{history} ( #{fields_with_pk}, valid_from )
- VALUES ( #{values_with_pk}, timezone('UTC', now()) )
- RETURNING #{fields_with_pk}, xmin
- )
- SQL
- end
+ CREATE TRIGGER chronomodel_insert INSTEAD OF INSERT ON #{table}
+ FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_insert();
+ SQL
# UPDATE - set the last history entry validity to now, save the current data
# in a new history entry and update the temporal table with the new data.
-
- # If this is the first statement of a transaction, inferred by the last
- # transaction ID that updated the row, create a new row in the history.
#
- # The current transaction ID is returned by txid_current() as a 64-bit
- # signed integer, while the last transaction ID that changed a row is
- # stored into a 32-bit unsigned integer in the __xid column. As XIDs
- # wrap over time, txid_current() adds an "epoch" counter in the most
- # significant bits (http://bit.ly/W2Srt7) of the int - thus here we
- # remove it by and'ing with 2^32-1.
+ # If there are no changes, this trigger suppresses redundant updates.
#
- # XID are 32-bit unsigned integers, and by design cannot be casted nor
- # compared to anything else, adding a CAST or an operator requires
- # super-user privileges, so here we do a double-cast from varchar to
- # int8, to finally compare it with the current XID. We're using 64bit
- # integers as in PG there is no 32-bit unsigned data type.
+ # If a row in the history with the current ID and current timestamp already
+ # exists, update it with new data. This logic makes possible to "squash"
+ # together changes made in a transaction in a single history row.
#
execute <<-SQL
- CREATE RULE #{table}_upd_first AS ON UPDATE TO #{table}
- WHERE old.__xid::char(10)::int8 <> (txid_current() & (2^32-1)::int8)
- DO INSTEAD (
+ CREATE OR REPLACE FUNCTION chronomodel_#{table}_update() RETURNS TRIGGER AS $$
+ DECLARE _now timestamp;
+ DECLARE _hid integer;
+ DECLARE _old record;
+ DECLARE _new record;
+ BEGIN
+ IF OLD IS NOT DISTINCT FROM NEW THEN
+ RETURN NULL;
+ END IF;
- UPDATE #{history} SET valid_to = timezone('UTC', now())
- WHERE #{pk} = old.#{pk} AND valid_to = '9999-12-31';
+ _old := row(#{journal.map {|c| "OLD.#{c}" }.join(', ')});
+ _new := row(#{journal.map {|c| "NEW.#{c}" }.join(', ')});
- INSERT INTO #{history} ( #{pk}, #{fields}, valid_from )
- VALUES ( old.#{pk}, #{values}, timezone('UTC', now()) );
+ IF _old IS NOT DISTINCT FROM _new THEN
+ UPDATE ONLY #{current} SET ( #{fields} ) = ( #{values} ) WHERE #{pk} = OLD.#{pk};
+ RETURN NEW;
+ END IF;
- UPDATE ONLY #{current} SET #{updates}
- WHERE #{pk} = old.#{pk}
- )
- SQL
+ _now := timezone('UTC', now());
+ _hid := NULL;
- # Else, update the already present history row with new data. This logic
- # makes possible to "squash" together changes made in a transaction in a
- # single history row, assuring timestamps consistency.
- #
- execute <<-SQL
- CREATE RULE #{table}_upd_next AS ON UPDATE TO #{table} DO INSTEAD (
- UPDATE #{history} SET #{updates}
- WHERE #{pk} = old.#{pk} AND valid_from = timezone('UTC', now());
+ SELECT hid INTO _hid FROM #{history} WHERE #{pk} = OLD.#{pk} AND lower(validity) = _now;
- UPDATE ONLY #{current} SET #{updates}
- WHERE #{pk} = old.#{pk}
- )
+ IF _hid IS NOT NULL THEN
+ UPDATE #{history} SET ( #{fields} ) = ( #{values} ) WHERE hid = _hid;
+ ELSE
+ UPDATE #{history} SET validity = tsrange(lower(validity), _now)
+ WHERE #{pk} = OLD.#{pk} AND upper_inf(validity);
+
+ INSERT INTO #{history} ( #{pk}, #{fields}, validity )
+ VALUES ( OLD.#{pk}, #{values}, tsrange(_now, NULL) );
+ END IF;
+
+ UPDATE ONLY #{current} SET ( #{fields} ) = ( #{values} ) WHERE #{pk} = OLD.#{pk};
+
+ RETURN NEW;
+ END;
+ $$ LANGUAGE plpgsql;
+
+ DROP TRIGGER IF EXISTS chronomodel_update ON #{table};
+
+ CREATE TRIGGER chronomodel_update INSTEAD OF UPDATE ON #{table}
+ FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_update();
SQL
# DELETE - save the current data in the history and eventually delete the
# data from the temporal table.
# The first DELETE is required to remove history for records INSERTed and
# DELETEd in the same transaction.
#
execute <<-SQL
- CREATE RULE #{table}_del AS ON DELETE TO #{table} DO INSTEAD (
+ CREATE OR REPLACE FUNCTION chronomodel_#{table}_delete() RETURNS TRIGGER AS $$
+ DECLARE _now timestamp;
+ BEGIN
+ _now := timezone('UTC', now());
- DELETE FROM #{history}
- WHERE #{pk} = old.#{pk}
- AND valid_from = timezone('UTC', now())
- AND valid_to = '9999-12-31';
+ DELETE FROM #{history}
+ WHERE #{pk} = old.#{pk} AND validity = tsrange(_now, NULL);
- UPDATE #{history} SET valid_to = timezone('UTC', now())
- WHERE #{pk} = old.#{pk} AND valid_to = '9999-12-31';
+ UPDATE #{history} SET validity = tsrange(lower(validity), _now)
+ WHERE #{pk} = old.#{pk} AND upper_inf(validity);
- DELETE FROM ONLY #{current}
- WHERE #{current}.#{pk} = old.#{pk}
- )
+ DELETE FROM ONLY #{current}
+ WHERE #{pk} = old.#{pk};
+
+ RETURN OLD;
+ END;
+ $$ LANGUAGE plpgsql;
+
+ DROP TRIGGER IF EXISTS chronomodel_delete ON #{table};
+
+ CREATE TRIGGER chronomodel_delete INSTEAD OF DELETE ON #{table}
+ FOR EACH ROW EXECUTE PROCEDURE chronomodel_#{table}_delete();
SQL
end
# In destructive changes, such as removing columns or changing column
# types, the view must be dropped and recreated, while the change has
# to be applied to the table in the temporal schema.
#
def chrono_alter(table_name)
transaction do
+ options = chrono_metadata_for(table_name)
+
execute "DROP VIEW #{table_name}"
_on_temporal_schema { yield }
- # Recreate the rules
- chrono_create_view_for(table_name)
+ # Recreate the triggers
+ chrono_create_view_for(table_name, options)
end
end
# Generic alteration of history tables, where changes have to be
# propagated both on the temporal table and the history one.