lib/spiderfw/model/storage/base_storage.rb in spiderfw-0.5.19 vs lib/spiderfw/model/storage/base_storage.rb in spiderfw-0.6.0

- old
+ new

@@ -1,20 +1,90 @@ +require 'spiderfw/model/storage/connection_pool' + module Spider; module Model; module Storage class BaseStorage include Spider::Logger attr_reader :url attr_accessor :instance_name - def self.sequence_sync - @sequence_sync ||= ::Sync.new - end + @capabilities = { + + } - def self.base_types - Model.base_types + class << self + # An Hash of storage capabilities. The default for db storages is + # {:autoincrement => false, :sequences => true, :transactions => true} + # (The BaseStorage class provides file sequences in case the subclass does not support them.) + attr_reader :capabilities + + def storage_type + :none + end + + def sequence_sync + @sequence_sync ||= ::Sync.new + end + + def base_types + Model.base_types + end + + # True if given named capability is supported by the Storage. + def supports?(capability) + @capabilities[capability] + end + + # Returns a new connection. Must be implemented by the subclasses; args are implementation specific. + def new_connection(*args) + raise "Unimplemented" + end + + def max_connections + nil + end + + def connection_pools + @pools ||= {} + end + + def get_connection(*args) + @pools ||= {} + @pools[args] ||= ConnectionPool.new(args, self) + @pools[args].get_connection + end + + # Frees a connection, relasing it to the pool + def release_connection(conn, conn_params) + return unless conn + return unless @pools && @pools[conn_params] + @pools[conn_params].release(conn) + end + + # Removes a connection from the pool. + def remove_connection(conn, conn_params) + return unless conn + return unless @pools && @pools[conn_params] + @pools[conn_params].remove(conn) + end + + def disconnect(conn) + raise "Virtual" + end + + # Checks whether a connection is still alive. Must be implemented by subclasses. + def connection_alive?(conn) + raise "Virtual" + end + + def inherited(subclass) + subclass.instance_variable_set("@capabilities", @capabilities) + end + end + def initialize(url) @url = url @configuration = {} parse_url(url) end @@ -29,20 +99,203 @@ def get_mapper(model) raise StorageException, "Unimplemented" end + def supports?(capability) + self.class.supports?(capability) + end + + def curr + var = nil + if Spider.conf.get('storage.shared_connection') + $STORAGES ||= {} + var = $STORAGES + else + var = Thread.current + end + var[:storages] ||= {} + var[:storages][self.class.storage_type] ||= {} + var[:storages][self.class.storage_type][@connection_params] ||= { + :transaction_nesting => 0, :savepoints => [] + } + end + + def connection_pool + self.class.connection_pools[@connection_params] + end + + # Instantiates a new connection with current connection params. + def connect + return self.class.get_connection(*@connection_params) + #Spider::Logger.debug("#{self.class.name} in thread #{Thread.current} acquired connection #{@conn}") + end + + # True if currently connected. + def connected? + curr[:conn] != nil + end + + + # Returns the current connection, or creates a new one. + # If a block is given, will release the connection after yielding. + def connection + curr[:conn] = connect + if block_given? + yield curr[:conn] + release # unless is_connected + return true + else + return curr[:conn] + end + end + + def self.connection_attributes + @connection_attributes ||= {} + end + + def connection_attributes + self.class.connection_attributes[connection] ||= {} + end + + # Releases the current connection to the pool. + def release + # The subclass should check if the connection is alive, and if it is not call remove_connection instead + c = curr[:conn] + #Spider.logger.debug("#{self} in thread #{Thread.current} releasing #{curr[:conn]}") + curr[:conn] = nil + self.class.release_connection(c, @connection_params) + #Spider.logger.debug("#{self} in thread #{Thread.current} released #{curr[:conn]}") + return nil + #@conn = nil + end + + # Prepares a value for saving. + def value_for_save(type, value, save_mode) + return prepare_value(type, value) + end + + # Prepares a value that will be used in a condition. + def value_for_condition(type, value) + return prepare_value(type, value) + end + + def value_to_mapper(type, value) + value + end + + def prepare_value(type, value) return value end def ==(storage) return false unless self.class == storage.class return false unless self.url == storage.url return true end + + def supports_transactions? + return self.class.supports?(:transactions) + end + + def transactions_enabled? + @configuration['enable_transactions'] && supports_transactions? + end + + def start_transaction + return unless transactions_enabled? + curr[:transaction_nesting] += 1 + return savepoint("point#{curr[:savepoints].length}") if in_transaction? + + Spider.logger.debug("#{self.class.name} starting transaction for connection #{connection.object_id}") + do_start_transaction + return true + end + + # May be implemented by subclasses. + def do_start_transaction + raise StorageException, "The current storage does not support transactions" + end + + def in_transaction + if in_transaction? + curr[:transaction_nesting] += 1 + return true + else + start_transaction + return false + end + end + + def in_transaction? + return false + end + + + def commit + return false unless transactions_enabled? + raise StorageException, "Commit without a transaction" unless in_transaction? + return curr[:savepoints].pop unless curr[:savepoints].empty? + commit! + end + + def commit_or_continue + return false unless transactions_enabled? + raise StorageException, "Commit without a transaction" unless in_transaction? + if curr[:transaction_nesting] == 1 + commit + curr[:transaction_nesting] = 0 + return true + else + curr[:transaction_nesting] -= 1 + end + end + + def commit! + Spider.logger.debug("#{self.class.name} commit connection #{curr[:conn].object_id}") + curr[:transaction_nesting] = 0 + do_commit + release + end + + def do_commit + raise StorageException, "The current storage does not support transactions" + end + + def rollback + raise "Can't rollback in a nested transaction" if curr[:transaction_nesting] > 1 + return rollback_savepoint(curr[:savepoints].last) unless curr[:savepoints].empty? + rollback! + end + + def rollback! + curr[:transaction_nesting] = 0 + Spider.logger.debug("#{self.class.name} rollback") + do_rollback + curr[:savepoints] = [] + release + end + + def do_rollback + raise StorageException, "The current storage does not support transactions" + end + + def savepoint(name) + curr[:savepoints] << name + end + + def rollback_savepoint(name=nil) + if name + curr[:savepoints] = curr[:savepoints][0,(curr[:savepoints].index(name))] + name + else + curr[:savepoints].pop + end + end + # Utility methods def sequence_file_path(name) path = 'var/sequences/'+name return path @@ -97,10 +350,10 @@ f.close end self.class.sequence_sync.lock(::Sync::UN) return seq end - + end ############################### # Exceptions #