lib/sp/job/pg_connection.rb in sp-job-0.2.3 vs lib/sp/job/pg_connection.rb in sp-job-0.3.22

- old
+ new

@@ -29,10 +29,11 @@ # # Public Attributes # attr_accessor :connection + attr_reader :config # # Prepare database connection configuration. # # @param owner @@ -56,10 +57,11 @@ @treshold = (new_min + (new_max - new_min) * rand).to_i else @treshold = new_min.to_i end end + @transaction_open = false end # # Establish a new database connection. # Previous one ( if any ) will be closed first. @@ -85,36 +87,60 @@ # # @param query the SQL query with data binding # @param args all the args for the query # @return query result. # - def exec (query, *args) + def execp (query, *args) @mutex.synchronize { if nil == @connection _connect() end _check_life_span() unless @id_cache.has_key? query id = "p#{Digest::MD5.hexdigest(query)}" - @connection.prepare(id, query) + begin + @connection.prepare(id, query) + rescue PG::DuplicatePstatement => ds + tmp_debug_str = "" + @id_cache.each do | k, v | + if v == id || k == query + tmp_debug_str += "#{v}: #{k}\n" + break + end + end + if 0 == tmp_debug_str.length + tmp_debug_str = "~~~\nAll Entries:\n" + @id_cache.to_s + else + tmp_debug_str = "~~~\nCached Entry:\n#{tmp_debug_str}" + end + tmp_debug_str += "~~~\nNew Entry: #{id}:#{query}\n" + raise "#{ds.message}\n#{tmp_debug_str}" + end @id_cache[query] = id else id = @id_cache[query] end @connection.exec_prepared(id, args) } end # - # Execute a query, + # Execute a normal SQL statement. # - # @param query + # @param query the SQL query with data binding + # @param args all the args for the query + # @return query result. # - def query (query:) + def exec (query, *args) @mutex.synchronize { - unless query.nil? - _check_life_span() + if nil == @connection + _connect() + end + _check_life_span() + if args.length > 0 + @connection.exec(sprintf(query, *args)) + else @connection.exec(query) end } end @@ -132,33 +158,87 @@ # def conn_str @config[:conn_str] end + # + # Call this to open a transaction + # + def begin + @mutex.synchronize { + if nil == @connection + _connect() + end + _check_life_span() + r = @connection.exec("BEGIN;") + if PG::PGRES_COMMAND_OK != r.result_status + raise "Unable to BEGIN a new transaction!" + end + @transaction_open = true + } + end + + # + # Call this to commit the currently open transaction + # + def commit + @mutex.synchronize { + if nil != @connection && true == @transaction_open + r = @connection.exec("COMMIT;") + if PG::PGRES_COMMAND_OK != r.result_status + raise "Unable to COMMIT a transaction!" + end + @transaction_open = false + end + } + end + + # + # Call this to open a transaction + # + def rollback + @mutex.synchronize { + if nil != @connection && true == @transaction_open + r = @connection.exec("ROLLBACK;") + if PG::PGRES_COMMAND_OK != r.result_status + raise "Unable to ROLLBACK a transaction!" + end + @transaction_open = false + end + } + + end + private - def _connect () + def _connect () _disconnect() @connection = PG.connect(@config[:conn_str]) end def _disconnect () + @transaction_open = false if @connection.nil? return end - @connection.exec("DEALLOCATE ALL") - @id_cache = {} + if @id_cache.size + @connection.exec("DEALLOCATE ALL") + @id_cache = {} + end @connection.close @connection = nil @counter = 0 end # # Check connection life span # def _check_life_span () + if true == @transaction_open + return + end return unless @treshold > 0 @counter += 1 if @counter > @treshold _connect() end