class MicroSql::PgAdapter < MicroSql def initialize(url) require 'pg' uri = URI.parse(url) @impl = PG.connect :host => uri.host, :port => uri.port || 5433, :user => uri.user, :password => uri.password, :dbname => uri.path[1..-1] @impl.set_notice_receiver { |result| MicroSql.logger.info(result.error_message) } end TRANSACTION_STATUSES = %w(PQTRANS_IDLE PQTRANS_ACTIVE PQTRANS_INTRANS PQTRANS_INERROR PQTRANS_UNKNOWN) def connection @impl end def transaction_status TRANSACTION_STATUSES[@impl.transaction_status] end def transaction? status = @impl.transaction_status status == PG::Connection::PQTRANS_INTRANS || status == PG::Connection::PQTRANS_INERROR end def transaction(&block) return savepoint_transaction(&block) if transaction? begin r = nil @impl.transaction do r = yield end r rescue RollbackException nil end end private def savepoint_transaction(&block) savepoint_name = "sqdb_#{object_id}" exec! "SAVEPOINT #{savepoint_name}" r = yield exec! "RELEASE SAVEPOINT #{savepoint_name}" r rescue RollbackException exec! "ROLLBACK TO SAVEPOINT #{savepoint_name}" nil end public def tables exec("SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND tablename NOT LIKE 'sql_%'").map(&:first) end def primary_keys(table) sql = <<-SQL SELECT pg_attribute.attname FROM pg_index, pg_class, pg_attribute WHERE pg_class.oid = ?::regclass AND indrelid = pg_class.oid AND pg_attribute.attrelid = pg_class.oid AND pg_attribute.attnum = any(pg_index.indkey) AND indisprimary SQL exec(sql, table).map(&:first) end def primary_key(table) keys = primary_keys(table) raise(Error, "No support for primary key (in table #{table})") if keys.length > 1 keys.first || raise(Error, "No primary key in table #{table}") end private def insert_sql(table_name, *args) "#{super} RETURNING #{table(table_name).primary_key}" end public def execute_batch(sql) sql.split(";").each { |part| exec!(part) } end private def execute(flag, sql, *args) execute_(flag, sql, *args) rescue PG::Error raise Error, $!.message end def execute_(flag, sql, *args) adjusted_sql = replace_placeholders(sql) result = if adjusted_sql && (flag == :prepare || flag == :ask) && prepared_statement = prepare(adjusted_sql) @impl.exec_prepared(prepared_statement, args) else @impl.exec(adjusted_sql || sql, args) end case sql when /^\s*UPDATE\b/ return result.cmd_tuples when /^\s*DELETE\b/ return result.cmd_tuples when /^\s*INSERT\b/ # postgresql has a different way of returning newly inserted ids; e.g. # "INSERT ... RETURNING id" # The returned id value is returned to the pg driver the same as # a SELECT would be. We therefore evaluate only the first record. flag = :insert records = result.values else records = result.values records = records[0,1] if flag == :ask end result = convert_records result, records flag == :insert ? format_results_for_ask(result) : result end module Conversion extend self def nop; end def string(s); s; end def unknown(s); s; end def integer(s); s.to_i; end def bytea(s); PG::Connection.unescape_bytea(s); end end def conversion_for(ftype, fmod) sym = case ftype when 16 then :boolean when 17 then :bytea when 19 then :nop when 20, 21, 22, 23, 26 then :integer when 25 then :nop # "text" when 700, 701 then :float when 790, 1700 then :big_decimal when 1083, 1266 then :string_to_time end return sym if sym && Conversion.respond_to?(sym) raise "Unsupported conversion #{sym.inspect}" if sym typename = @impl.exec( "SELECT format_type($1,$2)", [ftype, fmod] ).getvalue( 0, 0 ) return :nop if typename == "unknown" raise "Unsupported format_type #{typename.inspect} (ftype, fmod: #{ftype}, #{fmod})" end def convert_records(result, records) # Get the type of the result columns converters = (0 ... result.num_fields).map do |i| conversion = conversion_for result.ftype(i), result.fmod(i) [ conversion, i ] unless conversion == :nop end.compact records.map do |record| converters.each do |type, idx| record[idx] = Conversion.send type, record[idx] end record end end def replace_placeholders(sql) idx = 0 query = sql.gsub("?") do idx += 1 "$#{idx}" end idx == 0 ? nil : query end def prepare_query(key, sql) @impl.prepare(key, sql) key end end