/*================================================================= Copyright (C) 2016 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #pragma hdrstop #include "replCommand.h" #pragma package(smart_init) using namespace std; using namespace bzs::db::protocol::tdap; using namespace bzs::db::protocol::tdap::client; #define ER_BAD_SLAVE 26200 #define WARN_NO_MASTER_INF 26617 #define ER_UNTIL_REQUIRES_USING_GTID 26949 #define ER_SLAVE_CHANNEL_DOES_NOT_EXIST 28074 connectParams node::cp(int opt) { bool nodb = (opt & CP_NO_DB) != 0; bool noschem = nodb || ((opt & CP_NO_SCHEMA) != 0); connectParams cp1(_T("tdap"), host.c_str(), nodb ? _T("") : db.c_str(), noschem ? _T("") : schema.c_str(), user.c_str(), passwd.c_str()); return cp1; } void node::setOption(int op, bool enable) { if (enable) options |= op; else options &= ~op; } //------------------------------------------------------------------------------ struct replImpl { replicationParam& pm; replImpl(replicationParam& p) : pm(p){} virtual bool ignoreError(short stat) { return false;} virtual void setSlaveSkipError(database_ptr db, const char* /*gtid*/) { execSql(db, "set global sql_slave_skip_counter=1"); } virtual void setGtidSlavePos(database_ptr db, const binlogPos& bpos, connMgr* mgr){} virtual string stopSlaveSql() {return "stop slave";} virtual string stopAllSlaveSql() {return "stop slave";} virtual string startSlaveSql() {return "start slave";} virtual string startAllSlaveSql() {return "start slave";} virtual string resetSlaveSql() {return "reset slave";} virtual string resetSlaveAllSql() {return "reset slave all";} virtual string resetMasterSql() {return "reset master";} virtual string changeMasterSql(const binlogPos& bpos, connMgr* /*mgr*/) { char tmp[128]; _i64toa_s(bpos.pos, tmp, 128, 10); return changeMasterBaseSql() + ", master_log_file='" + string(bpos.filename) + "', master_log_pos=" + tmp; } virtual string switchMasterSql(replSlave::autoPosType) { THROW_BZS_ERROR_WITH_MSG(_T("No support switchMaster at this replication type.")); } virtual string startSlaveUntilSql(const binlogPos& bpos) { char buf[128]; _i64toa_s(bpos.pos, buf, 128, 10); return "start slave until master_log_file='" + string(bpos.filename) + "', master_log_pos=" + string(buf); } protected: std::vector<string> getGtids(char* str) { std::vector<string> gtids; char* en = str + strlen(str);; char* st = str; while (st < en) { char* p = strchr(st, ','); if (p) *p = 0x00; gtids.push_back(st); st = p ? p + 1 : en; if (*st == '\n') ++st; } return gtids; } string changeMasterBaseSql() { masterNode& mn = pm.master; string host = toUtf8(mn.host); // remove port number size_t pos = host.find(":"); if (pos != std::string::npos) host.erase(host.begin() + pos, host.end()); string s = "change master to master_host='" + host + "'"; if (mn.repPort != "") s += ", master_port=" + mn.repPort; if (mn.repUser != "") { s += ", master_user='" + mn.repUser + "'"; s += ", master_password='" + mn.repPasswd + "'"; } if (mn.repOption != "") s += "," + mn.repOption; return s; } }; struct replImplMaria : public replImpl { replImplMaria(database_ptr db, replicationParam& p) : replImpl(p) { if (pm.master.channel != "") { char tmp[256]; sprintf_s(tmp, 256, "set session default_master_connection='%s'", pm.master.channel.c_str()); execSql(db, tmp); } } bool ignoreError(short stat) { return stat == WARN_NO_MASTER_INF;} string stopAllSlaveSql() {return "stop all slaves";} string startAllSlaveSql() {return "start all slaves";} string changeMasterSql(const binlogPos& bpos, connMgr* mgr) { if (pm.type == REP_TYPE_REGACY) return replImpl::changeMasterSql(bpos, mgr); return changeMasterBaseSql() + ", master_use_gtid=slave_pos"; } string switchMasterSql(replSlave::autoPosType v) { if (v == replSlave::slave_pos) return changeMasterBaseSql() + ", master_use_gtid=slave_pos"; else return changeMasterBaseSql() + ", master_use_gtid=current_pos"; } void setGtidSlavePos(database_ptr db, const binlogPos& bpos, connMgr* mgr) { if (pm.type == REP_TYPE_REGACY) return; string gtid = getAllGtids(bpos, mgr); string s = "set global gtid_slave_pos=\"" + gtid + "\""; execSql(db, s.c_str()); } private: inline uint_td domainid(const char* gtid) {return strtoul(gtid, NULL, 10);} string getAllGtids(const binlogPos& bpos, connMgr* mgr) { string gtid = bpos.gtid; uint_td domain = domainid(gtid.c_str()); //const connMgr::records recs = mgr->channels(); connRecords_ptr recs_p = createConnRecords(mgr->channels()); const connMgr::records& recs = *recs_p.get(); validateStatus(mgr, _T("channels")); if (recs.size()) { const connMgr::records& recs2 = mgr->slaveStatus(recs[0].name); validateStatus(mgr, _T("Gtids slaveStatus")); if (recs2.size() > SLAVE_STATUS_MA_GTID_IO_POS) { char tmp[256]; recs2[SLAVE_STATUS_MA_GTID_IO_POS].value(tmp, 256); std::vector<string> gtids = getGtids(tmp); for (size_t i=0; i < gtids.size(); ++i) { if (domain != domainid(gtids[i].c_str())) gtid += string(",") + gtids[i]; } } } return gtid; } }; //------------------------------------------------------------------------------ struct replImplMy : public replImpl { replImplMy(replicationParam& p) : replImpl(p){} void setSlaveSkipError(database_ptr db, const char* gtid) { char tmp[256] = "set gtid_next = \""; strcat_s(tmp, 256, gtid); strcat_s(tmp, 256, "\""); execSql(db, tmp); execSql(db, "start transaction"); execSql(db, "commit"); execSql(db, "set gtid_next = automatic"); } string changeMasterSql(const binlogPos& bpos, connMgr* mgr) { if (pm.type == REP_TYPE_REGACY) return replImpl::changeMasterSql(bpos, mgr); return changeMasterBaseSql() + ", master_auto_position=1"; } string switchMasterSql(replSlave::autoPosType /*v*/) { return changeMasterBaseSql() + ", master_auto_position=1"; } void setGtidSlavePos(database_ptr db, const binlogPos& bpos, connMgr* mgr) { if (pm.type == REP_TYPE_REGACY || !pm.slave.option(OPT_RESET_MASTER)) return; string s = "set global gtid_purged=\"" + gtidset(bpos, mgr) + "\""; execSql(db, "reset master"); execSql(db, s.c_str()); } private: virtual string gtidset(const binlogPos& bpos, connMgr* /*mgr*/) { return bpos.gtid; } }; //------------------------------------------------------------------------------ struct replImplMy57 : public replImplMy { replImplMy57(replicationParam& p) : replImplMy(p){} bool ignoreError(short stat) { return (stat == ER_SLAVE_CHANNEL_DOES_NOT_EXIST); } string stopSlaveSql() {return "stop slave" + channelSql();} string startSlaveSql() {return "start slave" + channelSql();} string resetSlaveSql() {return "reset slave" + channelSql();} string resetSlaveAllSql() {return "reset slave all" + channelSql();} string startSlaveUntilSql(const binlogPos& bpos) { return replImpl::startSlaveUntilSql(bpos) + channelSql(); } string changeMasterSql(const binlogPos& bpos, connMgr* mgr) { return replImplMy::changeMasterSql(bpos, mgr) + channelSql(); } string switchMasterSql(replSlave::autoPosType v) { return replImplMy::switchMasterSql(v) + channelSql(); } private: string channelSql() {return " for channel '" + pm.master.channel + "'";} string uuid(string& gtid) { size_t pos = gtid.find(':'); if (pos != string::npos) return gtid.substr(0, pos); assert(0); return ""; } string gtidset(const binlogPos& bpos, connMgr* mgr) { // all channels char tmp[4096]; string gtid = bpos.gtid; strcpy_s(tmp, 4096, gtid.c_str()); std::vector<string> gds = getGtids(tmp); std::vector<string> uuids; for (size_t i=0;i<gds.size(); ++i) uuids.push_back(uuid(gds[i])); //const connMgr::records recs = mgr->channels(); connRecords_ptr recs_p = createConnRecords(mgr->channels()); const connMgr::records& recs = *recs_p.get(); validateStatus(mgr, _T("channels")); for (size_t i = 0; i < recs.size(); ++i) { if (pm.master.channel != recs[i].name) { const connMgr::records& recs2 = mgr->slaveStatus(recs[i].name); validateStatus(mgr, _T("Gtids slaveStatus")); if (recs2.size() > SLAVE_STATUS_EXECUTED_GTID_SET) { recs2[SLAVE_STATUS_EXECUTED_GTID_SET].value(tmp, 4096); std::vector<string> gtids = getGtids(tmp); for (size_t i=0; i < gtids.size(); ++i) { string uid = uuid(gtids[i]); if (find(uuids.begin(), uuids.end(), uid) == uuids.end()) gtid += string(",") + gtids[i]; } return gtid; } } } return gtid; } }; //------------------------------------------------------------------------------ // support functions //------------------------------------------------------------------------------ bool isMySqlGtidMode(connMgr* mgr) { const connMgr::records& rec = mgr->extendedvars(); return (rec.size() && (rec[TD_EXTENDED_VAR_MYSQL_GTID_MODE].longValue != 0)); } void notyfy(replicationNotify* nf, int v) { if (nf) nf->onUpdateStaus(v); } int resolv(replicationNotify* nf, int type, const connMgr::records& recs, int defValue) { if (nf) defValue = nf->onResolvError(type, recs); return defValue; } __int64 getSlaveIOPos(const connMgr::records& recs) { if (recs.size() > SLAVE_STATUS_READ_MASTER_LOG_POS) return recs[SLAVE_STATUS_READ_MASTER_LOG_POS].longValue; return 0; } std::string getSkipGtid(const connMgr::records& recs) { string s; if (recs.size() <= SLAVE_STATUS_EXECUTED_GTID_SET) return s; const char* uuid = recs[SLAVE_STATUS_MASTER_UUID].name; char buf[1024]; char* p = (char*)recs[SLAVE_STATUS_EXECUTED_GTID_SET].name; if (recs[SLAVE_STATUS_EXECUTED_GTID_SET].type == 2) p = (char*)recs[SLAVE_STATUS_EXECUTED_GTID_SET].longValue; p = strstr(p, uuid); if (!p) { // the first transaction error s = uuid; return s + ":1"; } strcpy_s(buf, MAX_PATH, p); p = buf; char* pp = strchr(p, ','); if (pp) *pp = 0x00; pp = strrchr(p, ':'); if (pp) { *pp = 0x00; s = p; s += ":"; p = pp+1; pp = strchr(p, '-'); if (pp) p = pp +1; unsigned __int64 v = _atoi64(p); ++v; _i64toa_s(v, buf, MAX_PATH, 10); s += buf; } return s; } bool isGtidAutoPos(const connMgr::records& recs) { return recs[SLAVE_STATUS_AUTO_POSITION].longValue != 0; } __int64 getSlaveExecPos(const connMgr::records& recs) { if (recs.size() > SLAVE_STATUS_EXEC_MASTER_LOG_POS) return recs[SLAVE_STATUS_EXEC_MASTER_LOG_POS].longValue; return 0; } bool isPosBrokn(const connMgr::records& recs) { __int64 iop = getSlaveIOPos(recs); __int64 exp = getSlaveExecPos(recs); return exp > iop; } bool isSamePosAsSlaveExecPos(const connMgr::records& recs, binlogPos& bpos) { return (strcmp(recs[SLAVE_STATUS_MASTER_LOG_FILE].name, bpos.filename)) == 0 && (recs[SLAVE_STATUS_EXEC_MASTER_LOG_POS].longValue == bpos.pos); } bool isSlaveSqlRunning(const connMgr::records& recs) { return strcmp(recs[SLAVE_STATUS_SLAVE_SQL_RUNNING].name, "Yes") == 0; } bool isSlaveIoRunning(const connMgr::records& recs) { return strcmp(recs[SLAVE_STATUS_SLAVE_IO_RUNNING].name, "Yes") == 0; } int getSlaveIoErrno(const connMgr::records& recs) { if (recs.size() > SLAVE_STATUS_LAST_IO_ERRNO) return (int)recs[SLAVE_STATUS_LAST_IO_ERRNO].longValue; return 0; } bool replSlave::resolvSqlError(const connMgr::records& recs, binlogPos& bpos, replicationNotify* nf) { int ret = resolv(nf, ERROR_SQL_THREAD, recs, RESOLV_RESULT_CANCEL); if (RESOLV_RESULT_CANCEL == ret) THROW_BZS_ERROR_WITH_MSG( _T("SQL thread has error(s).\nPlease retry after remove error(s).")); else if (RESOLV_RESULT_YES == ret) { skipLogEvent(getSkipGtid(recs).c_str()); startUntil(bpos); return false; }else stopAndReset(); return true; } bool replSlave::resolvIOError(const connMgr::records& recs, replicationNotify* nf) { int ret = resolv(nf, ERROR_IO_THREAD, recs, RESOLV_RESULT_CANCEL); if(RESOLV_RESULT_YES != ret) THROW_BZS_ERROR_WITH_MSG(_T("IO thread has error(s).\nPlease retry after remove error(s).")); stopAndReset(); return true; } bool replSlave::resolvBroknError(const connMgr::records& recs, replicationNotify* nf) { int ret = resolv(nf, ERROR_LOG_BROKN, recs, RESOLV_RESULT_CANCEL); if(RESOLV_RESULT_YES !=ret) THROW_BZS_ERROR_WITH_MSG(_T("Log position error.\nRebuild of replication is required.")); stopAndReset(); return true; } string toUtf8(const _tstring& s) { #ifdef _UNICODE char tmp[MAX_PATH]={0}; WideCharToMultiByte(CP_UTF8, 0, s.c_str(), -1, tmp, MAX_PATH, NULL, NULL); return tmp; #else return s; #endif } //------------------------------------------------------------------------------ // class replSlave //------------------------------------------------------------------------------ replSlave::replSlave(database_ptr db, replicationParam& pm, connMgr* mgr) : m_db(db), m_mgr(mgr), m_mysqlGtid(false) { if (pm.type == REP_TYPE_GTID_MA) m_impl = new replImplMaria(db, pm); // mariadb gtid else { btrVersions vers; db->getBtrVersion(&vers); validateStatus(db, _T("get slave version")); btrVersion ver = vers.versions[VER_IDX_DB_SERVER]; if (ver.majorVersion == 5 && ver.minorVersion < 6) m_impl = new replImpl(pm); // mysql maridb 5.5 else if (ver.isMariaDB()) m_impl = new replImplMaria(db, pm);// maridb 10.0 10.1 else { m_mysqlGtid = isMySqlGtidMode(mgr); if (!m_mysqlGtid) m_impl = new replImpl(pm); // 5.6 5.7 regacy else if (ver.minorVersion > 6) m_impl = new replImplMy57(pm);// 5.7 gtid else m_impl = new replImplMy(pm); // 5.6 gtid } } } replSlave::~replSlave() { delete m_impl; } void replSlave::validateError(const _TCHAR* msg) { if (m_db->stat() == 0 || m_impl->ignoreError(m_db->stat())) return; validateStatus(m_db, msg); } void replSlave::setSkipError(const char* gtid) { m_impl->setSlaveSkipError(m_db, gtid); } void replSlave::stop(bool all) { string s; if (all) s = m_impl->stopAllSlaveSql(); else s = m_impl->stopSlaveSql(); m_db->execSql(s.c_str()); validateError(_T("stop slave")); } void replSlave::start(bool all) { if (all) execSql(m_db, m_impl->startAllSlaveSql().c_str()); else execSql(m_db, m_impl->startSlaveSql().c_str()); } void replSlave::startNoThrow() { m_db->execSql(m_impl->startSlaveSql().c_str()); } void replSlave::reset() { string s = m_impl->resetSlaveSql(); m_db->execSql(s.c_str()); validateError(_T("reset slave")); } void replSlave::resetAll() { string s = m_impl->resetSlaveAllSql(); m_db->execSql(s.c_str()); validateError(_T("reset slave all")); } bool replSlave::startUntil(binlogPos& bpos) { m_db->execSql(m_impl->startSlaveUntilSql(bpos).c_str()); if ((m_db->stat() == ER_BAD_SLAVE) || (m_impl->pm.type == REP_TYPE_GTID_MA && m_db->stat() == ER_UNTIL_REQUIRES_USING_GTID)) return false; if (m_db->stat()) nstable::throwError(_T("start slave until"), m_db->stat()); return true; } void replSlave::changeMaster(const binlogPos* bpos) { m_impl->setGtidSlavePos(m_db, *bpos, m_mgr); execSql(m_db, m_impl->changeMasterSql(*bpos, m_mgr).c_str()); } void replSlave::switchMaster(autoPosType v) { execSql(m_db, m_impl->switchMasterSql(v).c_str()); } void replSlave::resetMaster() { execSql(m_db, "reset master"); } void replSlave::skipLogEvent(const char* gtid) { stop(one); setSkipError(gtid); } void replSlave::stopAndReset() { stop(one); reset(); } const replicationParam& replSlave::params() const { return m_impl->pm; } bool replSlave::isSlaveSync(binlogPos& bpos, replicationNotify* nf) { while (1) { const connMgr::records& recs = m_mgr->slaveStatus(m_impl->pm.master.channel.c_str()); validateStatus(m_mgr, _T("slave status")); /* In the case of first-time replication, size is zero. */ if (recs.size() == 0) return true; bool isIgnoreBrokn = isMysqlGtidMode() && isGtidAutoPos(recs); if (!isIgnoreBrokn && isPosBrokn(recs)) { if (resolvBroknError(recs, nf)) return true; } if (isSamePosAsSlaveExecPos(recs, bpos)) return true; if ((isSlaveIoRunning(recs) == false) && getSlaveIoErrno(recs)) { if (resolvIOError(recs, nf)) return true; } else if (isSlaveSqlRunning(recs) == false) { if (resolvSqlError(recs, bpos, nf)) return true; } else break; Sleep(10); } return false; } void replSlave::waitForSlaveSync(binlogPos& bpos, int waitTime, replicationNotify* nf) { Sleep(100); bool slaveSync = false; int n = waitTime * 100; for (int nn = 0;nn < 10; ++nn) { if (isSlaveSync(bpos, nf)) { slaveSync = true; break; } for (int i= 0;i < n; ++i) { notyfy(nf, REP_NF_WAIT); Sleep(10); } } if (!slaveSync) THROW_BZS_ERROR_WITH_MSG( _T("The slave SQL thread could not be executed") _T("until the target of the log position in time")); }