lib/sp/job/pg_connection.rb in sp-job-0.2.3 vs lib/sp/job/pg_connection.rb in sp-job-0.3.22
- old
+ new
@@ -29,10 +29,11 @@
#
# Public Attributes
#
attr_accessor :connection
+ attr_reader :config
#
# Prepare database connection configuration.
#
# @param owner
@@ -56,10 +57,11 @@
@treshold = (new_min + (new_max - new_min) * rand).to_i
else
@treshold = new_min.to_i
end
end
+ @transaction_open = false
end
#
# Establish a new database connection.
# Previous one ( if any ) will be closed first.
@@ -85,36 +87,60 @@
#
# @param query the SQL query with data binding
# @param args all the args for the query
# @return query result.
#
- def exec (query, *args)
+ def execp (query, *args)
@mutex.synchronize {
if nil == @connection
_connect()
end
_check_life_span()
unless @id_cache.has_key? query
id = "p#{Digest::MD5.hexdigest(query)}"
- @connection.prepare(id, query)
+ begin
+ @connection.prepare(id, query)
+ rescue PG::DuplicatePstatement => ds
+ tmp_debug_str = ""
+ @id_cache.each do | k, v |
+ if v == id || k == query
+ tmp_debug_str += "#{v}: #{k}\n"
+ break
+ end
+ end
+ if 0 == tmp_debug_str.length
+ tmp_debug_str = "~~~\nAll Entries:\n" + @id_cache.to_s
+ else
+ tmp_debug_str = "~~~\nCached Entry:\n#{tmp_debug_str}"
+ end
+ tmp_debug_str += "~~~\nNew Entry: #{id}:#{query}\n"
+ raise "#{ds.message}\n#{tmp_debug_str}"
+ end
@id_cache[query] = id
else
id = @id_cache[query]
end
@connection.exec_prepared(id, args)
}
end
#
- # Execute a query,
+ # Execute a normal SQL statement.
#
- # @param query
+ # @param query the SQL query with data binding
+ # @param args all the args for the query
+ # @return query result.
#
- def query (query:)
+ def exec (query, *args)
@mutex.synchronize {
- unless query.nil?
- _check_life_span()
+ if nil == @connection
+ _connect()
+ end
+ _check_life_span()
+ if args.length > 0
+ @connection.exec(sprintf(query, *args))
+ else
@connection.exec(query)
end
}
end
@@ -132,33 +158,87 @@
#
def conn_str
@config[:conn_str]
end
+ #
+ # Call this to open a transaction
+ #
+ def begin
+ @mutex.synchronize {
+ if nil == @connection
+ _connect()
+ end
+ _check_life_span()
+ r = @connection.exec("BEGIN;")
+ if PG::PGRES_COMMAND_OK != r.result_status
+ raise "Unable to BEGIN a new transaction!"
+ end
+ @transaction_open = true
+ }
+ end
+
+ #
+ # Call this to commit the currently open transaction
+ #
+ def commit
+ @mutex.synchronize {
+ if nil != @connection && true == @transaction_open
+ r = @connection.exec("COMMIT;")
+ if PG::PGRES_COMMAND_OK != r.result_status
+ raise "Unable to COMMIT a transaction!"
+ end
+ @transaction_open = false
+ end
+ }
+ end
+
+ #
+ # Call this to open a transaction
+ #
+ def rollback
+ @mutex.synchronize {
+ if nil != @connection && true == @transaction_open
+ r = @connection.exec("ROLLBACK;")
+ if PG::PGRES_COMMAND_OK != r.result_status
+ raise "Unable to ROLLBACK a transaction!"
+ end
+ @transaction_open = false
+ end
+ }
+
+ end
+
private
- def _connect ()
+ def _connect ()
_disconnect()
@connection = PG.connect(@config[:conn_str])
end
def _disconnect ()
+ @transaction_open = false
if @connection.nil?
return
end
- @connection.exec("DEALLOCATE ALL")
- @id_cache = {}
+ if @id_cache.size
+ @connection.exec("DEALLOCATE ALL")
+ @id_cache = {}
+ end
@connection.close
@connection = nil
@counter = 0
end
#
# Check connection life span
#
def _check_life_span ()
+ if true == @transaction_open
+ return
+ end
return unless @treshold > 0
@counter += 1
if @counter > @treshold
_connect()
end