/*================================================================= Copyright (C) 2012 2013 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #include "tdapCommandExecuter.h" #include "recordsetReader.h" #include #include #include #include #include //lookup for result value #include #include #include #ifdef WIN32 #include #endif #include "databaseSchema.h" #include #include namespace bzs { namespace db { using namespace engine::mysql; namespace protocol { using namespace transactd; using namespace transactd::connection; namespace tdap { namespace mysql { static const char* TableNameTitle = "dbfile="; static const char* BdfNameTitle = ".bdf"; std::string& toLowerCaseName(std::string& name , bool forSql) { if (forSql) { char tmp[MAX_PATH]; strcpy_s(tmp, MAX_PATH, name.c_str()); my_casedn_str(global_system_variables.collation_server, tmp); name = tmp; } else boost::algorithm::to_lower(name); return name; } std::string getDatabaseName(const request& req, bool forSql) { std::vector ssc ; std::vector ss ; if (req.keybuf) { std::string s((const char*)req.keybuf); split(ssc, s, "\t"); if (ssc.size()) { if (forSql && ssc.size()>1) ssc.erase(ssc.begin(), ssc.begin()+1); std::string path = ssc[0]; std::string derim = "/"; split(ss, path, derim); /* tdap://serverName/dbName?dbfile=xxx */ if (ss.size()>3) { std::vector db; split(db, ss[3], "?"); if (db.size()>0) { if (g_tableNmaeLower) toLowerCaseName(db[0], forSql); return db[0]; } } } } return ""; } std::string getTableName(const std::string& src, bool forSql) { std::vector ssc ; split(ssc, src, "\t"); if (forSql && ssc.size()>1) ssc.erase(ssc.begin(), ssc.begin()+1); std::string name = ssc[0]; size_t pos = name.find(TableNameTitle); if (pos != std::string::npos) { pos += strlen(TableNameTitle); while ((name[pos] == '/')||(name[pos] == '\\')) { if (++pos == name.size()) return ""; } size_t pos2 = name.find("." , pos); if (pos2 == std::string::npos) pos2 = name.size(); size_t pos3 = name.find("&" , pos); if (pos3 != std::string::npos && (pos3 < pos2)) pos2 = pos3; name = name.substr(pos, pos2 - pos); if (g_tableNmaeLower) toLowerCaseName(name, forSql); return name; } return ""; } std::string getTableName(const request& req, bool forSql) { if (req.keybuf) { std::string s((const char*)req.keybuf, req.keylen); return getTableName(s, forSql); } return ""; } const char* getOwnerName(const request& req) { const char* p = (const char*)req.data; if (*req.datalen && (p[*req.datalen-1] == 0x00)) return p; return ""; } void dumpStdErr(int op, request& req, table* tb) { boost::scoped_array msg(new char[1024]); sprintf_s(msg.get(),1024, "[Transactd] Exception:op=%d Handle=%d tablename=%s datalen=%d" " keynum=%d keylen=%d \n" , op, req.pbk->handle , tb ? tb->name().c_str() : "" , *req.datalen , req.keyNum, req.keylen); sql_print_error("%s", msg.get()); //dump Keybuf if (req.keylen && req.keybuf) { sprintf_s(msg.get(),1024,"[Transactd] Dump key buffer\n"); sql_print_error("%s",msg.get()); char* p = (char*)req.keybuf; bzs::rtl::debuglog::dump(stderr, (char*)req.keybuf, req.keylen, 256); } //dump databuffer if (*req.datalen && req.data) { sprintf_s(msg.get(),1024, "[Transactd] Dump data buffer\n"); sql_print_error("%s",msg.get()); bzs::rtl::debuglog::dump(stderr, (char*)req.data, (int)*req.datalen, 256); } } //------------------------------------------------------------- // class dbExecuter //------------------------------------------------------------- dbExecuter::dbExecuter():dbManager(),m_readHandler(new ReadRecordsHandler()) ,m_blobBuffer(new blobBuffer()) { } dbExecuter::~dbExecuter() { delete m_blobBuffer; delete m_readHandler; } void dbExecuter::connect(request& req) { req.paramMask = 0; if (req.keyNum == LG_SUBOP_DISCONNECT) releaseDatabse(req.cid); else { std::string dbname = getDatabaseName(req); std::string dbSqlname = getDatabaseName(req, FOR_SQL); //exec SQL use database if (dbname != "") { database* db = getDatabase(dbname.c_str(), req.cid); dbSqlname.insert(0, "use "); req.result = ddl_execSql(db->thd(), dbSqlname); } } } void dbExecuter::releaseDatabase(request& req, int op) { req.paramMask = 0; if ((op == TD_RESET_CLIENT)&&(req.keyNum !=0)) req.result = 1; else releaseDatabse(req.cid); } std::string dbExecuter::makeSQLcreateTable(const request& req) { return (const char*)req.data; //return sql; } int dbExecuter::errorCode(int ha_error) {//see mysqld_error.h or my_base.h or dbManager.h if (ha_error < HA_ERR_FIRST) return ha_error; else if (ha_error==HA_ERR_KEY_NOT_FOUND) return STATUS_NOT_FOUND_TI; else if (ha_error == HA_ERR_END_OF_FILE) return STATUS_EOF; else if (ha_error == HA_ERR_FOUND_DUPP_KEY) return STATUS_DUPPLICATE_KEYVALUE; else if(ha_error == HA_ERR_CRASHED_ON_USAGE) return STATUS_IO_ERROR; else if (ha_error ==HA_ERR_NO_ACTIVE_RECORD) return STATUS_NO_CURRENT; else if ((ha_error == HA_ERR_LOCK_WAIT_TIMEOUT) ||(ha_error==HA_ERR_LOCK_DEADLOCK)||(ha_error == ER_LOCK_WAIT_TIMEOUT)) return STATUS_LOCK_ERROR; else if (ha_error == STATUS_INVALID_LOCKTYPE) return STATUS_INVALID_LOCKTYPE; else if(ha_error==HA_ERR_AUTOINC_ERANGE) return STATUS_DUPPLICATE_KEYVALUE; else if (ha_error==ER_PARSE_ERROR) return STATUS_CANT_CREATE; else if(ha_error == ER_TABLE_EXISTS_ERROR) return STATUS_TABLE_EXISTS_ERROR; else if (ha_error == DBM_ERROR_TABLE_USED) return STATUS_CANNOT_LOCK_TABLE; return 25000 + ha_error; } bool isMetaDb(const request& req) { char buf[MAX_PATH]; strncpy(buf, (char*)req.keybuf, MAX_PATH); _strlwr(buf); char_m* st = _mbsstr((char_m*)buf, (char_m*)BdfNameTitle); if (st==NULL) st = (char_m*)strstr(buf, TRANSACTD_SCHEMANAME); return (st != NULL); } inline void dbExecuter::doCreateTable(request& req) { // if table name is mata table and database is nothing // then cretate database too. std::string dbname = getDatabaseName(req); std::string dbSqlname = getDatabaseName(req, FOR_SQL); if (dbname != "") { std::string cmd; database* db = getDatabase(dbname.c_str(), req.cid); if (isMetaDb(req)) {//for database operation if ((req.keyNum == 0) && (db->existsDatabase()==false)) req.result = ddl_createDataBase(db->thd(), dbSqlname); else if (req.keyNum == CR_SUBOP_DROP) { std::string tableName = getTableName(req); if (db->existsTable(tableName)) req.result = ddl_dropTable(db, tableName, dbSqlname, getTableName(req, FOR_SQL)); if (req.result == 0) req.result = ddl_dropDataBase(db->thd(), dbname, dbSqlname); req.result = errorCodeSht(req.result); return; } } if (req.result==0) {//table operation if ((req.result = ddl_useDataBase(db->thd(), dbSqlname))==0) { std::string tableSqlName = getTableName(req, FOR_SQL); std::string tableName = getTableName(req); if (req.keyNum == CR_SUBOP_DROP)// -128 is delete { if (db->existsTable(tableName)) req.result = ddl_dropTable(db, tableName, dbSqlname, tableSqlName); } else if (req.keyNum == CR_SUBOP_RENAME) {//rename new is keybuf request reqold; reqold.keybuf = req.data; reqold.keylen = *req.datalen; req.result = ddl_renameTable(db , getTableName(reqold)/*oldname*/ , dbSqlname , getTableName(reqold, FOR_SQL)/*oldname*/ , tableSqlName/*newName*/); } else if (req.keyNum == CR_SUBOP_SWAPNAME) {//swap name name2 = keybuf request reqold; reqold.keybuf = req.data; reqold.keylen = *req.datalen; req.result = ddl_replaceTable(db , getTableName(reqold)/*oldname*/ , tableName /*newName*/ , dbSqlname , getTableName(reqold, FOR_SQL)/*oldname*/ , tableSqlName /*newName*/); } else {//create if (req.data==NULL) req.result = 1; else { //-1 is overwrite if ((req.keyNum == CR_SUB_FLAG_EXISTCHECK) && (db->existsTable(tableName))) req.result = ddl_dropTable(db, tableName, dbSqlname, tableSqlName); if (req.result == 0) req.result = ddl_execSql(db->thd(), makeSQLcreateTable(req)); } } } } } else req.result = 1; req.result = errorCodeSht(req.result); } //open table and assign handle inline void dbExecuter::doOpenTable(request& req) { std::string dbname = getDatabaseName(req); if (dbname != "") { database* db = getDatabase(dbname.c_str(), req.cid); m_tb = db->openTable(getTableName(req), req.keyNum, getOwnerName(req));// if error occured that throw exception req.result = db->stat(); if (m_tb) { req.pbk->handle = addHandle(getDatabaseID(req.cid), m_tb->id()); m_tb = getTable(req.pbk->handle); m_tb->setBlobBuffer(m_blobBuffer); req.paramMask = P_MASK_POSBLK; } } else req.result = 1; } inline void readAfter(request& req, table* tb, dbExecuter* dbm) { if (tb->stat()==0) { if ((req.op>=TD_KEY_EQUAL_KO)&& (req.op <= TD_KEY_LAST_KO)) req.paramMask = P_MASK_POSBLK; else { req.paramMask = P_MASK_READRESULT; if (tb->blobFields()) req.paramMask |=P_MASK_BLOBBODY; req.data = tb->record(); } } req.result = dbm->errorCodeSht(tb->stat()); } inline void dbExecuter::doSeekKey(request& req, int op) { bool read = true; m_tb = getTable(req.pbk->handle); if (m_tb->setKeyNum(m_tb->keyNumByMakeOrder(req.keyNum))) { m_tb->setKeyValuesPacked((const uchar*)req.keybuf, req.keylen); if (read) { ha_rkey_function flag; if (op == TD_KEY_SEEK) flag = HA_READ_KEY_EXACT; else if (op == TD_KEY_AFTER) flag = HA_READ_AFTER_KEY; else if (op == TD_KEY_OR_AFTER) flag = HA_READ_KEY_OR_NEXT; else if (op == TD_KEY_BEFORE) flag = HA_READ_BEFORE_KEY; else if (op == TD_KEY_OR_BEFORE) flag = HA_READ_KEY_OR_PREV; m_tb->seekKey(flag); } } readAfter(req, m_tb, this); } inline void dbExecuter::doMoveFirst(request& req) { m_tb = getTable(req.pbk->handle); if (m_tb->setKeyNum(m_tb->keyNumByMakeOrder(req.keyNum))) { if (m_tb->isNisKey(m_tb->keyNum())) { m_tb->clearKeybuf(); m_tb->seekKey(HA_READ_KEY_OR_NEXT); }else m_tb->getFirst(); } readAfter(req, m_tb, this); } inline void dbExecuter::doMoveKey(request& req, int op) { m_tb = getTable(req.pbk->handle); if (m_tb->setKeyNum(m_tb->keyNumByMakeOrder(req.keyNum))) { if (op== TD_KEY_FIRST) m_tb->getFirst(); else if (op== TD_KEY_LAST) m_tb->getLast(); else if (op== TD_KEY_NEXT) m_tb->getNext(); else if (op== TD_KEY_PREV) m_tb->getPrev(); } readAfter(req, m_tb, this); } inline int dbExecuter::doReadMultiWithSeek(request& req, int op, char* resultBuffer , size_t& size, netsvc::server::buffers* optionalData) { int ret = 1; m_tb = getTable(req.pbk->handle); char keynum = m_tb->keyNumByMakeOrder(req.keyNum); if (m_tb->setKeyNum(keynum)) { m_tb->setKeyValuesPacked((const uchar*)req.keybuf, req.keylen); m_tb->seekKey((op == TD_KEY_GE_NEXT_MULTI) ? HA_READ_KEY_OR_NEXT : HA_READ_KEY_OR_PREV); extRequest* ereq = (extRequest*)req.data; req.result = m_readHandler->begin(m_tb, ereq, true , resultBuffer, RETBUF_EXT_RESERVE_SIZE, *req.datalen, (op == TD_KEY_GE_NEXT_MULTI)); if (req.result != 0) return 1; if (m_tb->stat() == 0) { if (op == TD_KEY_GE_NEXT_MULTI) m_tb->getNextExt(m_readHandler, true); else if (op == TD_KEY_LE_PREV_MULTI) m_tb->getPrevExt(m_readHandler, true); } req.result = errorCodeSht(m_tb->stat()); DEBUG_WRITELOG2(op, req); size = req.serializeForExt(m_tb, resultBuffer, m_readHandler->end()); if ((req.paramMask & P_MASK_BLOBBODY) && m_blobBuffer->fieldCount()) size = req.serializeBlobBody(m_blobBuffer, resultBuffer, FILE_MAP_SIZE, optionalData); DEBUG_PROFILE_END_OP(1, op) ret = EXECUTE_RESULT_SUCCESS; } if (m_tb)m_tb->unUse(); return ret; } inline int dbExecuter::doReadMulti(request& req, int op, char* resultBuffer , size_t& size, netsvc::server::buffers* optionalData) { int ret = 1; m_tb = getTable(req.pbk->handle); extRequest* ereq = (extRequest*)req.data; bool incCurrent = !((ereq->type[0]=='E') && (ereq->type[1]=='G')); bool forword = (op == TD_KEY_NEXT_MULTI) || (op == TD_POS_NEXT_MULTI); req.result = m_readHandler->begin(m_tb, ereq,(op != TD_KEY_SEEK_MULTI) , resultBuffer, RETBUF_EXT_RESERVE_SIZE, *req.datalen, forword); if (req.result == 0) { if (op == TD_KEY_SEEK_MULTI) { char keynum = m_tb->keyNumByMakeOrder(req.keyNum); if (m_tb->setKeyNum(keynum)) req.result = errorCodeSht(seekEach(ereq)); else { if (m_tb)m_tb->unUse(); return ret; } } else { if (op == TD_KEY_NEXT_MULTI) m_tb->getNextExt(m_readHandler, incCurrent); else if (op == TD_KEY_PREV_MULTI) m_tb->getPrevExt(m_readHandler, incCurrent); else if (op == TD_POS_NEXT_MULTI) m_tb->stepNextExt(m_readHandler, incCurrent); else if (op == TD_POS_PREV_MULTI) m_tb->stepPrevExt(m_readHandler, incCurrent); req.result = errorCodeSht(m_tb->stat()); } DEBUG_WRITELOG2(op, req); size = req.serializeForExt(m_tb, resultBuffer, m_readHandler->end()); if ((req.paramMask & P_MASK_BLOBBODY) && m_blobBuffer->fieldCount()) size = req.serializeBlobBody(m_blobBuffer, resultBuffer, FILE_MAP_SIZE, optionalData); DEBUG_PROFILE_END_OP(1, op) ret = EXECUTE_RESULT_SUCCESS; } if (m_tb)m_tb->unUse(); return ret; } inline short dbExecuter::seekEach(extRequest* ereq) { short stat = 0; logicalField* fd = &ereq->field; for (int i=0;ilogicalCount;++i) { m_tb->setKeyValuesPacked(fd->ptr, fd->len); m_tb->seekKey(HA_READ_KEY_EXACT); if (m_tb->stat() == 0) stat = m_readHandler->write(m_tb->position(), m_tb->posPtrLen()); else stat = m_readHandler->write(NULL, m_tb->posPtrLen(), errorCodeSht(m_tb->stat())); if (stat) break; fd = fd->next(); } if (stat==0) stat = STATUS_REACHED_FILTER_COND; return stat; } inline void dbExecuter::doStepRead(request& req, int op) { m_tb = getTable(req.pbk->handle); if (op == TD_POS_FIRST) m_tb->stepFirst(); else if (op == TD_POS_LAST) m_tb->stepLast(); else if (op == TD_POS_NEXT) m_tb->stepNext(); else if (op == TD_POS_PREV) m_tb->stepPrev(); readAfter(req, m_tb, this); } inline void dbExecuter::doInsert(request& req) { m_tb = getTable(req.pbk->handle, SQLCOM_INSERT); bool ncc = (req.keyNum==-1); if (!ncc) { if (!m_tb->setKeyNum(m_tb->keyNumByMakeOrder(req.keyNum))) { req.result = errorCodeSht(m_tb->stat()); return; } } m_tb->clearBuffer(); m_tb->setRecordFromPacked((const uchar*)req.data, *(req.datalen), req.blobHeader); smartBulkInsert sbi(m_tb, 1); __int64 aincValue = m_tb->insert(ncc); req.result = errorCodeSht(m_tb->stat()); if (aincValue) { req.paramMask = P_MASK_INS_AUTOINC; req.data = m_tb->record(); } else req.paramMask = P_MASK_POSBLK|P_MASK_KEYBUF; } inline void dbExecuter::doUpdateKey(request& req) { m_tb = getTable(req.pbk->handle, SQLCOM_UPDATE); char keynum = m_tb->keyNumByMakeOrder(req.keyNum); if (m_tb->setKeyNum(keynum)) { m_tb->setKeyValuesPacked((const uchar*)req.keybuf, req.keylen); m_tb->seekKey(HA_READ_KEY_EXACT); if (m_tb->stat() == 0) { m_tb->beginUpdate(keynum); if (m_tb->stat()==0) { m_tb->setRecordFromPacked((const uchar*)req.data, *(req.datalen), req.blobHeader); m_tb->update(true); } } } req.result = errorCodeSht(m_tb->stat()); req.paramMask = P_MASK_POSBLK|P_MASK_KEYBUF; } inline void dbExecuter::doUpdate(request& req) { m_tb = getTable(req.pbk->handle, SQLCOM_UPDATE); bool ncc = (req.keyNum==-1); m_tb->beginUpdate(req.keyNum); if (m_tb->stat()==0) { m_tb->setRecordFromPacked((const uchar*)req.data, *(req.datalen), req.blobHeader); m_tb->update(ncc); } req.result = errorCodeSht(m_tb->stat()); req.paramMask = P_MASK_POSBLK|P_MASK_KEYBUF; } inline void dbExecuter::doDeleteKey(request& req) { m_tb = getTable(req.pbk->handle, SQLCOM_DELETE); char keynum = m_tb->keyNumByMakeOrder(req.keyNum); if (m_tb->setKeyNum(keynum)) { m_tb->setKeyValuesPacked((const uchar*)req.keybuf, req.keylen); m_tb->seekKey(HA_READ_KEY_EXACT); if (m_tb->stat() == 0) { m_tb->beginDel(); if (m_tb->stat()==0) m_tb->del(); } } req.result = errorCodeSht(m_tb->stat()); req.paramMask = P_MASK_POSBLK; } inline void dbExecuter::doDelete(request& req) { m_tb = getTable(req.pbk->handle, SQLCOM_DELETE); m_tb->beginDel(); if (m_tb->stat()==0) m_tb->del(); req.result = errorCodeSht(m_tb->stat()); req.paramMask = P_MASK_POSBLK; } inline void dbExecuter::doInsertBulk(request& req) { m_tb = getTable(req.pbk->handle, SQLCOM_INSERT); if (m_tb->setKeyNum(m_tb->keyNumByMakeOrder(req.keyNum))) { ushort_td* n = (ushort_td*)req.data; ushort_td ret = 0; const uchar* pos = (const uchar*)req.data + sizeof(ushort_td); int ins_rows = 0; BUILINSERT_SCOPE { smartBulkInsert sbi(m_tb, *n); for (ushort_td i=0;i<*n;i++) { ushort_td len = *((ushort_td*)pos); if (pos + len > (const uchar*)req.data + *req.datalen) { ret = STATUS_BUFFERTOOSMALL; break; } else { m_tb->clearBuffer(); m_tb->setRecordFromPacked(pos + sizeof(ushort_td), len, req.blobHeader); if (i==*n-1) m_tb->insert((req.keyNum != -1)); else m_tb->insert(true); ret = m_tb->stat(); } if (ret==0) ins_rows++; DEBUG_INSERT(m_tb, pos + sizeof(ushort_td), len, i, i+1-ins_rows) pos += len + sizeof(ushort_td); } *n = ins_rows; } req.result = errorCodeSht(ret); req.resultLen = 4; req.paramMask = P_MASK_READ_EXT; }else req.result = errorCodeSht(m_tb->stat()); } inline void dbExecuter::doStat(request& req) { m_tb = getTable(req.pbk->handle); req.paramMask = P_MASK_STAT; req.resultLen = *req.datalen; if (req.resultLen >= 6 + sizeof(uint)) { ushort_td len = (ushort_td)(m_tb->recordLenCl()); if (m_tb->recordFormatType() == RF_FIXED_PLUS_VALIABLE_LEN) len -= m_tb->lastVarFiled()->pack_length() - m_tb->lastVarLenBytes(); memcpy((char*)req.data, &len, sizeof(ushort_td)); uint rows = (uint)m_tb->recordCount((req.keyNum!=0)); memcpy((char*)req.data+6, &rows, sizeof(uint)); }else req.result = STATUS_BUFFERTOOSMALL; } inline short getTrnsactionType(int op) { if (op > PARALLEL_TRN) //1000 op -= PARALLEL_TRN; if (op > NOWAIT_WRITE) //500 op -= NOWAIT_WRITE; if (op >= 300) return TRN_RECORD_LOCK_MUILTI; return TRN_RECORD_LOCK_SINGLE; } int dbExecuter::commandExec(request& req, char* resultBuffer, size_t& size, netsvc::server::buffers* optionalData) { DEBUG_PROFILE_START(1) m_tb = NULL; int op = req.op % 100; int opTrn = req.op; if (op==99) return 0; if ((op>=TD_KEY_EQUAL_KO)&& (op <= TD_KEY_LAST_KO)) op -= 50; try { posblk* pbk = req.pbk; posblk tmp; bool transactionResult = false; if (pbk==NULL) pbk = &tmp; req.reset(); switch(op) { case TD_GETSERVER_CHARSET: req.data = (void*) global_system_variables.collation_server->csname; req.resultLen = (uint_td)strlen( global_system_variables.collation_server->csname); req.paramMask |= P_MASK_DATA|P_MASK_DATALEN; break; case TD_CONNECT: connect(req); break; case TD_RESET_CLIENT: case TD_STOP_ENGINE: //close all table releaseDatabase(req, op); break; case TD_AUTOMEKE_SCHEMA: m_tb = getTable(req.pbk->handle, SQLCOM_INSERT); req.result = schemaBuilder().execute(getDatabaseCid(req.cid), m_tb); break; case TD_CREATETABLE: doCreateTable(req); break; case TD_OPENTABLE: doOpenTable(req); break; case TD_CLOSETABLE: m_tb = getTable(req.pbk->handle); if (m_tb) { m_tb->close(); m_tb=NULL; } break; case TD_KEY_SEEK: case TD_KEY_AFTER: case TD_KEY_OR_AFTER: case TD_KEY_BEFORE: case TD_KEY_OR_BEFORE: doSeekKey(req, op); break; case TD_KEY_FIRST: doMoveFirst(req); break; case TD_KEY_PREV: case TD_KEY_LAST: case TD_KEY_NEXT: doMoveKey(req, op); break; case TD_REC_INSERT: doInsert(req); break; case TD_REC_UPDATEATKEY: doUpdateKey(req); break; case TD_REC_UPDATE: doUpdate(req); break; case TD_REC_DELLETEATKEY: doDeleteKey(req); break; case TD_REC_DELETE: doDelete(req); break; case TD_BEGIN_TRANSACTION: transactionResult = getDatabaseCid(req.cid)->beginTrn(getTrnsactionType(opTrn)); break; case TD_END_TRANSACTION: transactionResult = getDatabaseCid(req.cid)->commitTrn(); break; case TD_ABORT_TRANSACTION: transactionResult = getDatabaseCid(req.cid)->abortTrn(); break; case TD_BEGIN_SHAPSHOT: transactionResult = getDatabaseCid(req.cid)->beginSnapshot(); break; case TD_END_SNAPSHOT: transactionResult = getDatabaseCid(req.cid)->endSnapshot(); break; case TD_TABLE_INFO: //support recordlen and recordCount only. doStat(req); break; case TD_POS_FIRST: case TD_POS_LAST: case TD_POS_NEXT: case TD_POS_PREV: doStepRead(req, op); break; case TD_BOOKMARK: m_tb = getTable(req.pbk->handle); req.paramMask = P_MASK_MOVPOS; req.data = (void*)m_tb->position(); req.resultLen = m_tb->posPtrLen(); break; case TD_MOVE_BOOKMARK: m_tb = getTable(req.pbk->handle); m_tb->movePos((uchar*)req.data, m_tb->keyNumByMakeOrder(req.keyNum)); readAfter(req, m_tb, this); break; case TD_GETDIRECTORY: { database* db = getDatabaseCid(req.cid); if (db->name().size() < 64) { req.keylen = (uchar_td)db->name().size()+1; req.keybuf = (void*)db->name().c_str(); req.paramMask = P_MASK_KEYBUF; }else req.result = STATUS_BUFFERTOOSMALL; break; } case TD_VERSION: if (*req.datalen >= sizeof(version)*3) { version* v = (version*)req.data; ++v; v->majorVersion = MYSQL_VERSION_ID/10000; v->minorVersion = (MYSQL_VERSION_ID/100) % 100; v->Type = 'M'; ++v; v->majorVersion = TRANSACTD_VER_MAJOR; v->minorVersion = TRANSACTD_VER_MINOR; v->Type = 'T'; req.paramMask = P_MASK_DATA|P_MASK_DATALEN; req.resultLen = sizeof(version)*3; }else req.result = STATUS_BUFFERTOOSMALL; break; case TD_CLEAR_OWNERNAME: req.keybuf = (void_td*)""; case TD_SET_OWNERNAME: { database* db = getDatabaseCid(req.cid); m_tb = getTable(req.pbk->handle); int num = (req.keyNum >1) ? req.keyNum -2: req.keyNum; num += '0'; std::string s("%@%"); s += (const char*)# s += (const char*)req.keybuf; req.result = ddl_execSql(db->thd(), makeSQLChangeTableComment(db->name(), m_tb->name(), s.c_str())); break; } case TD_DROP_INDEX: { //Key name of multi byte charctord is not supported. Use only ascii. database* db = getDatabaseCid(req.cid); m_tb = getTable(req.pbk->handle); req.result = ddl_execSql(db->thd(), makeSQLDropIndex(db->name(), m_tb->name(), m_tb->keyName(m_tb->keyNumByMakeOrder(req.keyNum)))); break; } case TD_KEY_GE_NEXT_MULTI: case TD_KEY_LE_PREV_MULTI: if (doReadMultiWithSeek(req, op, resultBuffer, size, optionalData) == EXECUTE_RESULT_SUCCESS) return EXECUTE_RESULT_SUCCESS; // Caution Call unUse() break; case TD_KEY_SEEK_MULTI: case TD_KEY_NEXT_MULTI: case TD_KEY_PREV_MULTI: case TD_POS_NEXT_MULTI: case TD_POS_PREV_MULTI: if (doReadMulti(req, op, resultBuffer, size, optionalData) == EXECUTE_RESULT_SUCCESS) return EXECUTE_RESULT_SUCCESS; // Caution Call unUse() break; case TD_MOVE_PER: m_tb = getTable(req.pbk->handle); if (m_tb->setKeyNum(m_tb->keyNumByMakeOrder(req.keyNum))) m_tb->getByPercentage(*((ushort_td*)req.data)); readAfter(req, m_tb, this); break; case TD_GET_PER: m_tb = getTable(req.pbk->handle); m_tb->calcPercentage(); req.result = errorCodeSht(m_tb->stat()); if (m_tb->stat()==0) { req.paramMask = P_MASK_DATA|P_MASK_DATALEN; req.data = m_tb->percentResult(); req.resultLen = sizeof(int); } break; case TD_INSERT_BULK: doInsertBulk(req); break; } if (m_tb)m_tb->unUse(); DEBUG_WRITELOG2(op, req) size = req.serialize(m_tb, resultBuffer); if ((req.result==0) && (req.paramMask & P_MASK_BLOBBODY) && m_blobBuffer->fieldCount()) size = req.serializeBlobBody(m_blobBuffer, resultBuffer, FILE_MAP_SIZE, optionalData); if (transactionResult) { if ((op == TD_BEGIN_TRANSACTION)||(op ==TD_BEGIN_SHAPSHOT)) return EXECUTE_RESULT_FORCSE_SYNC; return EXECUTE_RESULT_FORCSE_ASYNC; } DEBUG_PROFILE_END_OP(1, op) return EXECUTE_RESULT_SUCCESS; } catch(bzs::rtl::exception &e) { clenupNoException(); req.reset(); const int* code = getCode(e); if(code) req.result = *code; else { req.result = 20000; sql_print_error("%s", boost::diagnostic_information(e).c_str()); } printErrorMessage(code, getMsg(e)); } catch(...) { clenupNoException(); req.reset(); req.result = 20001; dumpStdErr(op, req, m_tb); try { if (m_tb) m_tb->close(); } catch(...){} } DEBUG_WRITELOG3(op, req, true); req.paramMask = 0; size = req.serialize(NULL, resultBuffer); return EXECUTE_RESULT_SUCCESS; } // --------------------------------------------------------------------------- // class connMgrExecuter // --------------------------------------------------------------------------- connMgrExecuter::connMgrExecuter(request& req, unsigned __int64 parent) :m_req(req),m_modHandle(parent) { } int connMgrExecuter::read(char* buf, size_t& size) { connManager st(m_modHandle); unsigned __int64* mod = (unsigned __int64*)m_req.keybuf; const connManager::records& records = st.getRecords(mod[0], (int)mod[1]); m_req.reset(); m_req.paramMask = P_MASK_DATA|P_MASK_DATALEN; if (records.size()) m_req.data = (void*)&records[0]; else m_req.paramMask = P_MASK_DATALEN; m_req.resultLen = (uint_td)(sizeof(record) * records.size()); size = m_req.serialize(NULL, buf); return EXECUTE_RESULT_SUCCESS; } int connMgrExecuter::disconnectOne(char* buf, size_t& size) { connManager st(m_modHandle); unsigned __int64* mod = (unsigned __int64*)m_req.keybuf; st.disconnect(*mod); m_req.reset(); size = m_req.serialize(NULL, buf); return EXECUTE_RESULT_SUCCESS; } int connMgrExecuter::disconnectAll(char* buf, size_t& size) { connManager st(m_modHandle); st.disconnectAll(); m_req.reset(); size = m_req.serialize(NULL, buf); return EXECUTE_RESULT_SUCCESS; } int connMgrExecuter::commandExec(char* buf, size_t& size) { if (m_req.keyNum == TD_STSTCS_READ) return read(buf, size); else if (m_req.keyNum == TD_STSTCS_DISCONNECT_ONE) return disconnectOne( buf, size); else if (m_req.keyNum == TD_STSTCS_DISCONNECT_ALL) return disconnectAll(buf, size); return 0; } // --------------------------------------------------------------------------- // class commandExecuter // --------------------------------------------------------------------------- commandExecuter::commandExecuter(__int64 parent):m_modHandle(parent) { m_dbExec.reset(new dbExecuter()); } commandExecuter::~commandExecuter() { m_dbExec.reset(); } size_t commandExecuter::perseRequestEnd(const char* p, size_t transfered, bool& comp)const { if (transfered < sizeof(unsigned int)) return 0; size_t size = *((unsigned int*)p); if (size == transfered) comp = true; return size; } bool commandExecuter::parse(const char* p, size_t size) { const char* end = p + size; memset(&m_req, 0, sizeof(request)); m_req.parse(p); return 0; } }//namespace mysql }//namespace tdap }//namespace protocol }//namespace db }//namespace bzs