#!/usr/bin/env ruby require 'sync' module Xampl #@@persister = nil @@known_persisters = {} @@persister_kinds = {} def Xampl.persister Thread.current[:persister] end def Xampl.block_future_changes(on=true) Thread.current[:persister].block_changes = on if Thread.current[:persister] end def Xampl.auto_persistence(on=true) Thread.current[:persister].automatic = on if Thread.current[:persister] end def Xampl.register_persister_kind(klass) @@persister_kinds[klass.kind] = klass end def Xampl.disable_all_persisters Thread.current[:persister] = nil @@known_persisters = {} end def Xampl.disable_persister Thread.current[:persister] = nil end @@default_persister_kind = :simple @@default_persister_format = :xml_format def Xampl.default_persister_kind @@default_persister_kind end def Xampl.set_default_persister_kind(kind) @@default_persister_kind = kind puts "SET KIND format: #{@@default_persister_format}, kind: #{@@default_persister_kind}" end def Xampl.default_persister_format @@default_persister_format end def Xampl.set_default_persister_format(format) @@default_persister_format = format puts "SET FORMAT format: #{@@default_persister_format}, kind: #{@@default_persister_kind}" end @@xampl_lock = Sync.new def Xampl.enable_persister(name, kind=nil, format=nil) initial_persister = nil @@xampl_lock.synchronize(:EX) do initial_persister = Thread.current[:persister] kind = kind || @@default_persister_kind format = format || @@default_persister_format Thread.current[:persister] = @@known_persisters[name] if Thread.current[:persister] then if kind and kind != Thread.current[:persister].kind then raise IncompatiblePersisterRequest.new(Thread.current[:persister], "kind", kind, Thread.current[:persister].kind) end if format and format != Thread.current[:persister].format then raise IncompatiblePersisterRequest.new(Thread.current[:persister], "format", format, Thread.current[:persister].format) end end unless Thread.current[:persister] then # puts "CREATE PERSISTER #{name}, format: #{format}, kind: #{kind}" Thread.current[:persister] = @@persister_kinds[kind].new(name, format) if(nil != name) then @@known_persisters[name] = Thread.current[:persister] end end end return initial_persister end def Xampl.print_known_persisters puts "Known Persisters:: --------------------------" @@known_persisters.each { | n, k | puts " #{n} #{k}" } puts "---------------------------------------------" end def Xampl.flush_persister_caches Xampl.print_known_persisters @@known_persisters.delete(Thread.current[:persister].name) Xampl.print_known_persisters end def Xampl.drop_all_persisters puts "Drop All Persisters:: --------------------------" @@known_persisters.each { | n, k | puts " #{n} #{k}" } puts "---------------------------------------------" @@known_persisters = {} GC.start GC.start GC.start end def Xampl.drop_persister(name) Xampl.print_known_persisters @@known_persisters.delete(name) Xampl.print_known_persisters end def Xampl.transaction(thing, kind=nil, automatic=true, format=nil, &block) if String === thing then name = thing elsif XamplObject === thing then name = thing.persister.name else raise XamplException.new("can't base a transaction on a #{thing.class.name} (#{thing})") end if block_given? then @@xampl_lock.synchronize(:EX) do initial_persister = Xampl.enable_persister(name, kind, format) Thread.current[:persister].lock.synchronize(:EX) do rollback = true exception = nil original_automatic = Thread.current[:persister].automatic begin #TODO -- impose some rules on nested transactions/enable_persisters?? Xampl.auto_persistence(automatic) result = yield Xampl.block_future_changes(true) Xampl.sync rollback = false return result rescue => e exception = e ensure Xampl.block_future_changes(false) Xampl.auto_persistence(original_automatic) if rollback then if exception then puts "ROLLBACK(#{__LINE__}):: #{exception}" if rollback print exception.backtrace.join("\n") if rollback else puts "ROLLBACK(#{__LINE__}):: UNKNOWN CAUSE" if rollback end end Xampl.rollback if rollback Thread.current[:persister] = initial_persister end end end # sync xampl_lock end end def Xampl.read_only_transaction(thing, kind=nil, automatic=true, format=nil, &block) if String === thing then name = thing elsif XamplObject === thing then name = thing.persister.name else raise XamplException.new("can't base a transaction on a #{thing.class.name} (#{thing})") end target_persister = nil if block_given? then @@xampl_lock.synchronize(:SH) do initial_persister = Xampl.enable_persister(name, kind, format) Thread.current[:persister].lock.synchronize(:EX) do target_persister = Thread.current[:persister] rollback = true original_automatic = Thread.current[:persister].automatic @changed ||= nil original_changed = @changed @changed = {} begin Xampl.auto_persistence(false) #Xampl.block_future_changes(true) yield rollback = false ensure Xampl.auto_persistence(original_automatic) #Xampl.block_future_changes(false) if 0 == @changed.size then @changed = original_changed puts "ROLLBACK(#{__LINE__})" if rollback Xampl.rollback if rollback Thread.current[:persister] = initial_persister else puts "CHANGED COUNT: #{@changed.size}" @changed = original_changed puts "ROLLBACK(#{__LINE__})" if rollback Xampl.rollback Thread.current[:persister] = initial_persister raise BlockedChange.new(target_persister) end end end end # sync xampl_lock end end def Xampl.read_only(target_persister) name = target_persister.name if block_given? then @@xampl_lock.synchronize(:SH) do initial_persister = Xampl.enable_persister(name, target_persister.kind, target_persister.format) Thread.current[:persister].lock.synchronize(:EX) do rollback = true original_automatic = Thread.current[:persister].automatic original_changed = @changed @changed = {} begin Xampl.auto_persistence(false) #Xampl.block_future_changes(true) yield rollback = false ensure #### Xampl.auto_persistence(original_automatic) #### #Xampl.block_future_changes(false) #### #### if 0 < @changed.size then #### puts "CHANGED COUNT: #{@changed.size}" #### raise BlockedChange.new(target_persister) #### end #### #### @changed = original_changed #### #### puts "ROLLBACK(#{__LINE__})" if rollback #### Xampl.rollback if rollback #### Thread.current[:persister] = initial_persister Xampl.auto_persistence(original_automatic) #Xampl.block_future_changes(false) if 0 == @changed.size then @changed = original_changed puts "ROLLBACK(#{__LINE__})" if rollback Xampl.rollback if rollback Thread.current[:persister] = initial_persister else puts "CHANGED COUNT: #{@changed.size}" @changed = original_changed puts "ROLLBACK(#{__LINE__})" if rollback Xampl.rollback Thread.current[:persister] = initial_persister raise BlockedChange.new(target_persister) end end end end # sync xampl_lock end end def Xampl.introduce_to_persister(xampl) Thread.current[:persister].introduce(xampl) if Thread.current[:persister] end def Xampl.count_changed Thread.current[:persister].count_changed if Thread.current[:persister] end def Xampl.print_stats Thread.current[:persister].print_stats if Thread.current[:persister] end def Xampl.auto_cache(xampl) if (nil == xampl.persister) and Thread.current[:persister] and Thread.current[:persister].automatic then xampl.persister = Thread.current[:persister] end if xampl.persister and xampl.persister.automatic then xampl.persister.cache(xampl) end end def Xampl.auto_uncache(xampl) if xampl.persister and xampl.persister.automatic then xampl.persister.uncache(xampl) end end def Xampl.clear_cache Thread.current[:persister].clear_cache if Thread.current[:persister] end def Xampl.sync #raise XamplException.new(:live_across_rollback) if @@persister.rolled_back Thread.current[:persister].sync if Thread.current[:persister] end def Xampl.sync_all @@known_persisters.each{ | name, persister | persister.sync } end def Xampl.rollback(persister=Thread.current[:persister]) raise NoActivePersister unless persister persister.rollback_cleanup end def Xampl.rollback_all @@known_persisters.values.each{ | persister | persister.rollback } end def Xampl.lazy_load(xampl) @@xampl_lock.synchronize(:SH) do Thread.current[:persister].lock.synchronize(:EX) do pid = xampl.get_the_index if xampl and pid and Thread.current[:persister] then Thread.current[:persister].lazy_load(xampl, xampl.class, pid) if xampl and pid and Thread.current[:persister] xampl.load_needed = false else puts "XAMPL.LAZY_LOAD -- REFUSED" end end end end def Xampl.lookup(klass, pid) Thread.current[:persister].lookup(klass, pid) if Thread.current[:persister] end def Xampl.find_known(klass, pid) xampl, ignore = Thread.current[:persister].find_known(klass, pid) if Thread.current[:persister] return xampl end def Xampl.write_to_cache(xampl) Thread.current[:persister].write_to_cache(xampl) end def Xampl.cache(xampl) Thread.current[:persister].cache(xampl) end def Xampl.lookup_in_map(map, klass, pid) return nil if nil == pid module_name = klass.module_name tag = klass.tag tag_map = map[module_name] return nil if nil == tag_map pid_map = tag_map[tag] return nil if nil == pid_map return pid_map[pid] end def Xampl.store_in_map(map, xampl) module_name = xampl.module_name tag = xampl.tag pid = xampl.get_the_index if nil == pid then return false end if block_given? then data = yield else data = xampl end tag_map = map[module_name] if nil == tag_map then tag_map = {} map[module_name] = tag_map end pid_map = tag_map[tag] if nil == pid_map then pid_map = {} tag_map[tag] = pid_map end pid_map[pid] = data return true end def Xampl.store_in_cache(map, xampl, container) module_name = xampl.module_name tag = xampl.tag pid = xampl.get_the_index if nil == pid then return false end if block_given? then data = yield else data = xampl end tag_map = map[module_name] if nil == tag_map then tag_map = {} map[module_name] = tag_map end pid_map = tag_map[tag] if nil == pid_map then pid_map = container.fresh_cache tag_map[tag] = pid_map end pid_map[pid] = data return true end def Xampl.remove_from_map(map, xampl) pid = xampl.get_the_index return nil unless pid tag_map = map[xampl.module_name] return nil unless tag_map pid_map = tag_map[xampl.tag] return nil unless pid_map return pid_map.delete(pid) end class Persister attr_accessor :name, :automatic, :block_changes, :read_count, :total_read_count, :write_count, :total_write_count, :total_sync_count, :total_rollback_count, :cache_hits, :total_cache_hits, :last_write_count, :rolled_back attr_reader :syncing, :format, :lock def initialize(name=nil, format=nil) @name = name @format = format @automatic = false @changed = {} @cache_hits = 0 @total_cache_hits = 0 @read_count = 0 @total_read_count = 0 @write_count = 0 @total_write_count = 0 @last_write_count = 0 @total_sync_count = 0 @total_rollback_count = 0 @rolled_back = false @syncing = false @lock = Sync.new @busy_count = 0 end def busy(yes) if yes then @busy_count += 1 elsif 0 < @busy_count then @busy_count -= 1 end end def is_busy return 0 < @busy_count end def introduce(xampl) if xampl.introduce_persister(self) then cache(xampl) end has_changed(xampl) if xampl.is_changed end def has_changed(xampl) #raise XamplException.new(:live_across_rollback) if @rolled_back # puts "!!!! has_changed #{xampl} #{xampl.get_the_index} -- persist required: #{xampl.persist_required}" if xampl.persist_required && xampl.is_changed then unless self == xampl.persister raise MixedPersisters.new(xampl.persister, self) end @changed[xampl] = xampl # puts "!!!! change recorded ==> #{@changed.size}/#{count_changed} #{@changed.object_id} !!!!" # @changed.each{ | thing, ignore | # puts " changed: #{thing}, index: #{thing.get_the_index}, changed: #{thing.is_changed}" # } end end def has_not_changed(xampl) # puts "!!!! has_not_changed #{xampl} #{xampl.get_the_index} -- in @changed: #{nil != @changed[xampl]}" @changed.delete(xampl) if xampl end def count_changed # @changed.each{ | thing, ignore | # puts "changed: #{thing}, index: #{thing.get_the_index}" # } return @changed.size end def cache(xampl) raise XamplException.new(:unimplemented) end def uncache(xampl) raise XamplException.new(:unimplemented) end def clear_cache raise XamplException.new(:unimplemented) end def Persister.replace(old_xampl, new_xampl) pid = old_xampl.get_the_index if old_xampl.persister != Thread.current[:persister] then raise MixedPersisters.new(Thread.current[:persister], old_xampl.persister) end if new_xampl.persister != Thread.current[:persister] then raise MixedPersisters.new(Thread.current[:persister], new_xampl.persister) end new_xampl.note_replacing(old_xampl) unless old_xampl.load_needed then Xampl.log.warn("Replacing live xampl: #{old_xampl} pid: #{pid}") Thread.current[:persister].uncache(old_xampl) old_xampl.invalidate end new_xampl.pid = nil new_xampl.pid = pid Thread.current[:persister].introduce(new_xampl) end def represent(xampl) case @format when nil, :xml_format then return xampl.persist when :ruby_format then return xampl.to_ruby when :yaml_format then return xampl.as_yaml end end def realise(representation, target=nil) # Normally we'd expect to see the representation in the @format format, but # that isn't necessarily the case. Try to work out what the format might be... if representation =~ /^ #{xampl}" return xampl end def put_changed(msg="") puts "Changed::#{msg}:" @changed.each { | xampl, ignore | puts " #{xampl.tag} #{xampl.get_the_index}" } end def do_sync_write unchanged_in_changed_list = 0 # puts "DO SYNC WRITE:: changed: #{@changed.size}, #{@changed.object_id}" @changed.each { | xampl, ignore | # puts " WRITE: #{xampl}, index: #{xampl.get_the_index}, changed: #{xampl.is_changed}" unchanged_in_changed_list += 1 unless xampl.is_changed write(xampl) unless xampl.kind_of?(InvalidXampl) } end def sync #raise XamplException.new(:live_across_rollback) if @rolled_back begin #puts "SYNC" #puts "SYNC" #puts "SYNC changed: #{@changed.size}" #@changed.each do | key, value | # #puts "key: #{key.class.name}, value: #{value.class.name}" # puts key.to_xml #end #puts "SYNC" #puts "SYNC" busy(true) @syncing = true do_sync_write @changed = {} @total_read_count = @total_read_count + @read_count @total_write_count = @total_write_count + @write_count @total_cache_hits = @total_cache_hits + @cache_hits @total_sync_count = @total_sync_count + 1 @read_count = 0 @last_write_count = @write_count @write_count = 0 self.sync_done() return @last_write_count ensure busy(false) @syncing = false end end def sync_done raise XamplException.new(:unimplemented) end def rollback begin busy(true) return Xampl.rollback(self) ensure busy(false) end end def rollback_cleanup @changed = {} end def print_stats printf("SYNC:: TOTAL cache_hits: %d, reads: %d, writes: %d\n", @total_cache_hits, @total_read_count, @total_write_count) printf(" cache_hits: %d, reads: %d, last writes: %d\n", @cache_hits, @read_count, @last_write_count) printf(" syncs: %d\n", @total_sync_count) printf(" changed count: %d (%d)\n", count_changed, @changed.size) @changed.each{ | thing, ignore | if thing.is_changed then puts " changed: #{thing}, index: #{thing.get_the_index}" else puts " UNCHANGED: #{thing}, index: #{thing.get_the_index} <<<<<<<<<<<<<<<<<<< BAD!" end } end end class NoActivePersister < Exception def message "No Persister is active" end end class BlockedChange < Exception attr_reader :xampl def initialize(xampl=nil) @xampl = xampl end def message "attempt to change #{@xampl}, pid: #{@xampl.get_the_index}, oid: #{@xampl.object_id} when changes are blocked" end end class UnmanagedChange < Exception attr_reader :xampl def initialize(xampl=nil) @xampl = xampl end def message "attempt to change #{@xampl}, pid: #{@xampl.get_the_index}, oid: #{@xampl.object_id} outside of its persister's management" end end class IncompatiblePersisterRequest < Exception attr_reader :msg def initialize(persister, feature_name, requested_feature_value, actual_feature_value) @msg = "persister #{persister.name}:: requested feature: #{feature_name} #{requested_feature_value}, actual: #{actual_feature_value}" end def message @msg end end class MixedPersisters < Exception attr_reader :msg def initialize(active, local) @msg = "mixed persisters:: active #{active.name}, local: #{local.name}" end def message @msg end end require "persister/simple" require "persister/in-memory" require "persister/filesystem" end