module PgConn class RoleMethods attr_reader :conn def initialize(conn) @conn = conn # TODO: Check if conn is a superuser connection end # Return true if role(s) exists def exist?(*rolenames, superuser: nil, can_login: nil) rolenames = Array(rolenames).flatten.compact rolename_clause = "rolname in (#{PgConn.sql_values(rolenames)})" superuser_clause = case superuser when true; "rolsuper" when false; "not rolsuper" else nil end can_login_clause = case can_login when true; "rolcanlogin" when false; "not rolcanlogin" else nil end where_clause = [rolename_clause, superuser_clause, can_login_clause].compact.join(" and ") conn.value("select count(*) from pg_roles where #{where_clause}") == rolenames.size end # Return true if the user can login def can_login?(username, superuser: nil) exist?(username, superuser: superuser, can_login: true) end alias_method :user?, :can_login? # Return true if the user is a superuser def superuser?(username) exist?(username, superuser: true) end # Create a new role def create(rolename, superuser: false, create_database: false, can_login: false, create_role: false) user_decl = "create role \"#{rolename}\"" superuser_decl = superuser ? "superuser" : "nosuperuser" create_database_decl = create_database ? "createdb" : "nocreatedb" can_login_decl = can_login ? "login" : "nologin" create_role_decl = create_role ? "createrole" : "nocreaterole" stmt = [user_decl, superuser_decl, can_login_decl, create_role_decl].compact.join(" ") conn.exec stmt end # Remove all privileges from the given role def clean(rolename) end # Drop existing users. Return true if any role was dropped. Drop depending # privileges and objects too if :cascade is true. Note that cascade only # works if connected to the database where the privileges exist. The # :silent option is used in tests - fix it somehow! def drop(*rolenames, cascade: false, silent: false) rolenames = Array(rolenames).flatten.compact.select { |role| exist?(role) } return false if rolenames.empty? rolenames_sql = PgConn.sql_idents(rolenames) conn.exec "drop owned by #{rolenames_sql} cascade" if cascade conn.exec "drop role #{rolenames_sql}" true end # List users. TODO Use RE instead of database argument. Also doc this shit # FIXME: Depends on the __ convention def list(database: nil, owner: false, superuser: nil, can_login: nil) database_clause = database && "rolname like '#{database}__%'" database_clause = database && "(#{database_clause} or rolname = '#{database}')" if owner superuser_clause = superuser.nil? ? nil : "rolsuper = #{superuser}" can_login_clause = can_login.nil? ? nil : "rolcanlogin = #{can_login}" query = [ "select rolname from pg_roles where true", database_clause, superuser_clause, can_login_clause ].compact.join(" and ") conn.values(query) end end end