/*================================================================= Copyright (C) 2013 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #include "appModule.h" #include <bzs/db/protocol/tdap/mysql/tdapCommandExecuter.h> #include "connManager.h" #include <bzs/db/engine/mysql/mysqlThd.h> #include <bzs/db/engine/mysql/mysqlProtocol.h> #include <bzs/db/protocol/tdap/mysql/databaseSchema.h> #include <bzs/db/engine/mysql/errorMessage.h> /* implemnts in transactd.cpp */ extern const char* get_trd_sys_var(int index); extern int getSlaveStatus(THD* thd, bzs::db::transactd::connection::records& recs); namespace bzs { using namespace netsvc::server; namespace db { using namespace protocol::tdap::mysql; using namespace engine::mysql; namespace transactd { connManager::~connManager() { } const module* getMod(unsigned __int64 conid) { for (size_t i = 0; i < modules.size(); i++) { const module* mod = dynamic_cast<module*>(modules[i]); if ((unsigned __int64)mod == conid) return mod; } return NULL; } const database* connManager::getDatabase(igetDatabases* dbm, int dbid) const { const databases& dbs = dbm->dbs(); if (dbid < (int)dbs.size()) return dbs[dbid].get(); return NULL; } void connManager::getConnectionList() const { for (size_t i = 0; i < modules.size(); i++) { const module* mod = dynamic_cast<module*>(modules[i]); if (mod && ((unsigned __int64)mod != m_me)) { m_records.push_back(connection::record()); connection::record& rec = m_records[m_records.size() - 1]; rec.conId = (unsigned __int64)mod; boost::asio::ip::address adr = mod->m_endpoint.address(); strncpy_s(rec.name, 64, adr.to_string().c_str(), 64); } } } void connManager::getDatabaseList(igetDatabases* dbm, const module* mod) const { const databases& dbs = dbm->dbs(); for (size_t j = 0; j < dbs.size(); j++) { if (dbs[j]) { const database* db = dbs[j].get(); m_records.push_back(connection::record()); connection::record& rec = m_records[m_records.size() - 1]; rec.conId = (unsigned __int64)mod; rec.id = db->clientID(); rec.db = (unsigned short)j; rec.status = 0; rec.inTransaction = db->inTransaction(); rec.inSnapshot = db->inSnapshot(); if (rec.inTransaction) { if (db->transactionIsolation() == ISO_REPEATABLE_READ) rec.type = MULTILOCK_GAP; else if (db->transactionIsolation() == ISO_READ_COMMITTED) { if (db->transactionType() == TRN_RECORD_LOCK_SINGLE) rec.type = SINGLELOCK_NOGAP; else rec.type = MULTILOCK_NOGAP; } } if (rec.inSnapshot) { if (db->transactionIsolation() == 0) rec.type = CONSISTENT_READ; else if (db->transactionIsolation() == ISO_REPEATABLE_READ) rec.type = MULTILOCK_GAP_SHARE; else if (db->transactionIsolation() == ISO_READ_COMMITTED) rec.type = MULTILOCK_NOGAP_SHARE; } strncpy_s(rec.name, 64, dbs[j]->name().c_str(), 64); } } } const connection::records& connManager::getRecords(unsigned __int64 conid, int dbid) const { //Lock module count boost::mutex::scoped_lock lck(modulesMutex); if (conid == 0) getConnectionList(); else { const module* mod = getMod(conid); if (mod) { igetDatabases* dbm = dynamic_cast<igetDatabases*>(mod->m_commandExecuter.get()); //Lock database add remove boost::mutex::scoped_lock lck(dbm->mutex()); if (dbid < 0) getDatabaseList(dbm, mod); else { const database* db = getDatabase(dbm, dbid); if (db) { const std::vector<boost::shared_ptr<table> >& tables = db->tables(); //Lock table add release in the db boost::mutex::scoped_lock lckt(database::tableRef.mutex()); for (size_t k = 0; k < tables.size(); k++) { const table* tb = tables[k].get(); if (tb) { m_records.push_back(connection::record()); connection::record& rec = m_records[m_records.size() - 1]; rec.id = tb->id(); rec.readCount = tb->readCount(); rec.updCount = tb->updCount(); rec.delCount = tb->delCount(); rec.insCount = tb->insCount(); rec.status = 0; rec.openNormal = (tb->mode() == TD_OPEN_NORMAL); rec.openReadOnly = (tb->mode() == TD_OPEN_READONLY); rec.openEx = (tb->mode() == TD_OPEN_EXCLUSIVE); rec.openReadOnlyEx = (tb->mode() == TD_OPEN_READONLY_EXCLUSIVE); strncpy_s(rec.name, 64, tb->name().c_str(), 64); } } } } } } return m_records; } const connection::records& connManager::systemVariables() const { m_records.clear(); for (int i = 0 ; i < TD_VAR_SIZE; ++i) { const char* p = ::get_trd_sys_var(i); if (p) { m_records.push_back(connection::record()); connection::record& rec = m_records[m_records.size() - 1]; rec.id = i; switch(i) { case TD_VER_DB: case TD_VAR_LISTENADDRESS: case TD_VAR_LISTENPORT: case TD_VAR_HOSTCHECKNAME: case TD_VAR_ISOLATION: case TD_VAR_AUTHTYPE: case TD_VAR_HSLISTENPORT: strncpy(rec.value, p , 65); break; case TD_VER_SERVER: sprintf_s(rec.value, 65, "%d.%d.%d", TRANSACTD_VER_MAJOR, TRANSACTD_VER_MINOR, TRANSACTD_VER_RELEASE); break; default: _ltoa_s(*((unsigned int*)p), rec.value, 65, 10); break; } } } return m_records; } const connection::records& connManager::definedDatabases() const { m_records.clear(); try { boost::shared_ptr<THD> thd(createThdForThread(), deleteThdForThread); if (thd != NULL) { cp_security_ctx(thd.get())->skip_grants(); readDbList(thd.get(), m_records); } } catch (bzs::rtl::exception& e) { const int* code = getCode(e); if (code) m_stat = *code; else { m_stat = 20000; sql_print_error("%s", boost::diagnostic_information(e).c_str()); } printWarningMessage(code, getMsg(e)); } catch (...) { m_stat = 20001; } return m_records; } const connection::records& connManager::getTableList(const char* dbname, int type) const { try { m_records.clear(); if (dbname && dbname[0]) { boost::shared_ptr<database> db(new database(dbname, 1)); if (db) { std::vector<std::string> tablelist; schemaBuilder::listTable(db.get(), tablelist, type); for (int i = 0; i < (int)tablelist.size(); ++i) { m_records.push_back(connection::record()); connection::record& rec = m_records[m_records.size() - 1]; strncpy(rec.name, tablelist[i].c_str(), 64); rec.name[64] = 0x00; rec.type = (short)type; } } } } catch (bzs::rtl::exception& e) { const int* code = getCode(e); if (code) m_stat = *code; else { m_stat = 20000; sql_print_error("%s", boost::diagnostic_information(e).c_str()); } printWarningMessage(code, getMsg(e)); } catch (...) { m_stat = 20001; } return m_records; } const connection::records& connManager::definedTables(const char* dbname, int type) const { return getTableList(dbname, type); } const connection::records& connManager::schemaTables(const char* dbname) const { return getTableList(dbname, TABLE_TYPE_TD_SCHEMA); } bool connManager::checkGlobalACL(THD* thd, ulong wantAccess) const { const module* mod = reinterpret_cast<const module*>(m_me); if (mod->isSkipGrants()) cp_security_ctx(thd)->skip_grants(); else ::setGrant(thd, mod->host(), mod->user(), NULL); bool ret = (cp_masterAccess(thd) & wantAccess) != 0; if (!ret) m_stat = STATUS_ACCESS_DENIED; return ret; } const connection::records& connManager::readSlaveStatus() const { m_records.clear(); try { boost::shared_ptr<THD> thd(createThdForThread(), deleteThdForThread); if (thd == NULL) { m_stat = 20001; return m_records; } if (!checkGlobalACL(thd.get(), SUPER_ACL | REPL_CLIENT_ACL)) return m_records; m_stat = errorCode(getSlaveStatus(thd.get(), m_records)); } catch (bzs::rtl::exception& e) { const int* code = getCode(e); if (code) m_stat = *code; else { m_stat = 20000; sql_print_error("%s", boost::diagnostic_information(e).c_str()); } printWarningMessage(code, getMsg(e)); } catch (...) { m_stat = 20001; } return m_records; } void connManager::doDisconnect(unsigned __int64 conid) { module* mod = const_cast<module*>(getMod(conid)); if (mod) { igetDatabases* dbm = dynamic_cast<igetDatabases*>(mod->m_commandExecuter.get()); const databases& dbs = dbm->dbs(); if (dbs.size()) { if (dbs[0]->checkAcl(SHUTDOWN_ACL)) mod->disconnect(); else m_stat = STATUS_ACCESS_DENIED; } } } void connManager::disconnect(unsigned __int64 conid) { boost::try_mutex::scoped_lock m(modulesMutex, boost::try_to_lock_t()); if (m.owns_lock()) doDisconnect(conid); else { for (int i = 0; i < 20; i++) { if (m.try_lock()) return doDisconnect(conid); Sleep(100); } } } void connManager::doDisconnectAll() { short stat = 0; for (size_t i = 0; i < modules.size(); i++) { const module* mod = dynamic_cast<module*>(modules[i]); if (mod && ((unsigned __int64)mod != m_me)) doDisconnect((unsigned __int64)mod); if (stat == 0) stat = m_stat; } m_stat = stat; } void connManager::disconnectAll() { boost::try_mutex::scoped_lock m(modulesMutex, boost::try_to_lock_t()); if (m.owns_lock()) doDisconnectAll(); else { for (int i = 0; i < 20; i++) { if (m.try_lock()) return doDisconnectAll(); ; Sleep(100); } } } } // namespace transactd } // namespace db } // namespace bzs