lib/rubyrep/logged_change.rb in rubyrep-1.0.5 vs lib/rubyrep/logged_change.rb in rubyrep-1.0.6
- old
+ new
@@ -1,199 +1,27 @@
module RR
- class Session
-
- # Returns the +LoggedChangeLoader+ of the specified database.
- # * database: either :+left+ or :+right+
- def change_loader(database)
- @change_loaders ||= {}
- unless change_loader = @change_loaders[database]
- change_loader = @change_loaders[database] = LoggedChangeLoader.new(self, database)
- end
- change_loader
- end
-
- # Forces an update of the change log cache
- def reload_changes
- change_loader(:left).update :forced => true
- change_loader(:right).update :forced => true
- end
-
- end
-
- # Caches the entries in the change log table
- class LoggedChangeLoader
-
- # The current +Session+.
- attr_accessor :session
-
- # The current +ProxyConnection+.
- attr_accessor :connection
-
- # Index to the next unprocessed change in the +change_array+.
- attr_accessor :current_index
-
- # ID of the last cached change log record.
- attr_accessor :current_id
-
- # Array with all cached changes.
- # Processed change log records are replaced with +nil+.
- attr_accessor :change_array
-
- # Tree (hash) structure for fast access to all cached changes.
- # First level of tree:
- # * key: table name
- # * value: 2nd level tree
- # 2nd level tree:
- # * key: the change_key value of the according change log records.
- # * value:
- # An array of according change log records (column_name => value hash).
- # Additional entry of each change log hash:
- # * key: 'array_index'
- # * value: index to the change log record in +change_array+
- attr_accessor :change_tree
-
- # Date of last update of the cache
- attr_accessor :last_updated
-
- # Initializes / resets the cache.
- def init_cache
- self.change_tree = {}
- self.change_array = []
- self.current_index = 0
- end
- private :init_cache
-
- # Create a new change log record cache.
- # * +session+: The current +Session+
- # * +database+: Either :+left+ or :+right+
- def initialize(session, database)
- self.session = session
- self.connection = session.send(database)
-
- init_cache
- self.current_id = -1
- self.last_updated = 1.year.ago
- end
-
- # Updates the cache.
- # Options is a hash determining when the update is actually executed:
- # * :+expire_time+: cache is older than the given number of seconds
- # * :+forced+: if +true+ update the cache even if not yet expired
- def update(options = {:forced => false, :expire_time => 1})
- return unless options[:forced] or Time.now - self.last_updated >= options[:expire_time]
-
- self.last_updated = Time.now
-
- # First, let's use a LIMIT clause (via :row_buffer_size option) to verify
- # if there are any pending changes.
- # (If there are many pending changes, this is (at least with PostgreSQL)
- # much faster.)
- cursor = connection.select_cursor(
- :table => change_log_table,
- :from => {'id' => current_id},
- :exclude_starting_row => true,
- :row_buffer_size => 1
- )
- return unless cursor.next?
-
- # Something is here. Let's actually load it.
- cursor = connection.select_cursor(
- :table => change_log_table,
- :from => {'id' => current_id},
- :exclude_starting_row => true,
- :type_cast => true
- )
- while cursor.next?
- change = cursor.next_row
- self.current_id = change['id']
- self.change_array << change
- change['array_index'] = self.change_array.size - 1
-
- table_change_tree = change_tree[change['change_table']] ||= {}
- key_changes = table_change_tree[change['change_key']] ||= []
- key_changes << change
- end
- cursor.clear
- end
-
- # Returns the creation time of the oldest unprocessed change log record.
- def oldest_change_time
- change = oldest_change
- change['change_time'] if change
- end
-
- # Returns the oldest unprocessed change log record (column_name => value hash).
- def oldest_change
- update
- oldest_change = nil
- unless change_array.empty?
- while (oldest_change = change_array[self.current_index]) == nil
- self.current_index += 1
- end
- end
- oldest_change
- end
-
- # Returns the specified change log record (column_name => value hash).
- # * +change_table+: the name of the table that was changed
- # * +change_key+: the change key of the modified record
- def load(change_table, change_key)
- update
- change = nil
- table_change_tree = change_tree[change_table]
- if table_change_tree
- key_changes = table_change_tree[change_key]
- if key_changes
- # get change object and delete from key_changes
- change = key_changes.shift
-
- # delete change from change_array
- change_array[change['array_index']] = nil
-
- # delete change from database
- connection.execute "delete from #{change_log_table} where id = #{change['id']}"
-
- # delete key_changes if empty
- if key_changes.empty?
- table_change_tree.delete change_key
- end
-
- # delete table_change_tree if empty
- if table_change_tree.empty?
- change_tree.delete change_table
- end
-
- # reset everything if no more changes remain
- if change_tree.empty?
- init_cache
- end
- end
- end
- change
- end
-
- # Returns the name of the change log table
- def change_log_table
- @change_log_table ||= "#{session.configuration.options[:rep_prefix]}_pending_changes"
- end
- private :change_log_table
- end
-
# Describes a single logged record change.
#
# Note:
# The change loading functionality depends on the current database session
# being executed in an open database transaction.
# Also at the end of change processing the transaction must be committed.
class LoggedChange
+ # The current LoggedChangeLoader
+ attr_accessor :loader
+
# The current Session
- attr_accessor :session
+ def session
+ @session ||= loader.session
+ end
- # The database which was changed. Either :+left+ or :+right+.
- attr_accessor :database
+ # The current database (either +:left+ or +:right+)
+ def database
+ @database ||= loader.database
+ end
# The name of the changed table
attr_accessor :table
# When the first change to the record happened
@@ -211,15 +39,14 @@
# Only used for updates: a column_name => value hash of the original primary
# key of the updated record
attr_accessor :new_key
# Creates a new LoggedChange instance.
- # * +session+: the current Session
+ # * +loader+: the current LoggedChangeLoader
# * +database+: either :+left+ or :+right+
- def initialize(session, database)
- self.session = session
- self.database = database
+ def initialize(loader)
+ self.loader = loader
self.type = :no_change
end
# A hash describing how the change state morph based on newly found change
# records.
@@ -279,11 +106,11 @@
org_key = session.send(database).primary_key_names(table).map do |key_name|
"#{key_name}#{key_sep}#{org_key[key_name]}"
end.join(key_sep)
current_key = org_key
- while change = session.change_loader(database).load(table, current_key)
+ while change = loader.load(table, current_key)
new_type = change['change_type']
current_type = TYPE_CHANGES["#{current_type}#{new_type}"]
self.first_changed_at ||= change['change_time']
@@ -311,29 +138,23 @@
self.table = table
self.key = key
load
end
- # Returns the time of the oldest change. Returns +nil+ if there are no
- # changes left.
- def oldest_change_time
- session.change_loader(database).oldest_change_time
- end
-
# Loads the oldest available change
def load_oldest
begin
- change = session.change_loader(database).oldest_change
+ change = loader.oldest_change
break unless change
self.key = key_to_hash(change['change_key'])
self.table = change['change_table']
load
end until type != :no_change
end
# Prevents session from going into YAML output
def to_yaml_properties
- instance_variables.sort.reject {|var_name| var_name == '@session'}
+ instance_variables.sort.reject {|var_name| ['@session', '@loader'].include? var_name}
end
end
end
\ No newline at end of file