/*================================================================= Copyright (C) 2013 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #include "hsCommandExecuter.h" #include #include #include #include //lookup for result value #include #include #include #include #ifdef USE_HANDLERSOCKET namespace bzs { namespace db { using namespace engine::mysql; namespace protocol { using namespace transactd; using namespace transactd::connection; namespace hs { //----------------------------------------------------------------------- // request //----------------------------------------------------------------------- ha_rkey_function request::seekFlag() { switch (table.key.logical) { case HS_LG_EQUAL: return HA_READ_KEY_EXACT; case HS_LG_GREATER: return HA_READ_AFTER_KEY; case HS_LG_GREATEROREQUAL: return HA_READ_KEY_OR_NEXT; case HS_LG_LESS: return HA_READ_BEFORE_KEY; case HS_LG_LESSOREQUAL: return HA_READ_KEY_OR_PREV; } return HA_READ_KEY_EXACT; } bool request::naviSame() { return (table.key.logical == HS_LG_EQUAL); } bool request::naviForward() { return (table.key.logical == HS_LG_GREATER) || (table.key.logical == HS_LG_GREATEROREQUAL); } void seriarizeRecordsetBegin(engine::mysql::table* tb, resultBuffer& buf, bool sendBforeValue) { if (sendBforeValue) { buf.append("0\t"); buf.append((int)tb->useFields().size()); } } void seriarizeRecord(engine::mysql::table* tb, resultBuffer& buf, bool sendBforeValue) { if (sendBforeValue) { for (int i = 0; i < (int)tb->useFields().size(); i++) { buf.append("\t"); int size; const char* p = tb->valStr(tb->useFields()[i], size); if (size == -1) buf.append("\0"); else buf.append(p, size); } } } void seriarizeRecordsetEnd(engine::mysql::table* tb, resultBuffer& buf, int count, bool sendBforeValue) { if (!sendBforeValue) { buf.append("0\t1\t"); buf.append(count); buf.append("\n"); } else buf.append("\n"); } void writeError(int error, resultBuffer& buf, const char* msg) { buf.append(error); buf.append("\t"); buf.append(msg); buf.append("\n"); } void writeStatus(int stat, resultBuffer& buf, int option) { int v = 0; v = std::min(stat, 1); buf.append(v); buf.append("\t"); buf.append(option); buf.append("\n\0"); } //----------------------------------------------------------------------- // clas dbExecuter //----------------------------------------------------------------------- #define READ_SUCCESS 0 #define FILTER_MATCH READ_SUCCESS #define FILTER_SKIP 1 #define FILTER_BREAK 2 int update(request& req, engine::mysql::table* tb, int type) { tb->beginUpdate(tb->keyNum()); tb->setUseValues(req.table.values, type); tb->update(true); return tb->stat(); } int del(request& req, engine::mysql::table* tb, int type) { tb->beginDel(); tb->del(); return tb->stat(); } inline bool match(short logical, int v) { switch (logical) { case HS_LG_EQUAL: return (v == 0); //== case HS_LG_GREATER: return (v > 0); //> case HS_LG_LESS: return (v < 0); //< case HS_LG_NOTEQUAL: return (v != 0); //!= case HS_LG_GREATEROREQUAL: return (v >= 0); //>= case HS_LG_LESSOREQUAL: return (v <= 0); //<= } return false; } int filter(engine::mysql::table* tb, request::table::filters_type& filters) { for (size_t i = 0; i < filters.size(); ++i) { request::table::filter& f = filters[i]; if (f.col >= (int)tb->useFields().size()) THROW_BZS_ERROR_WITH_MSG("fcol"); Field* fd = tb->field(tb->useFields()[f.col]); uchar tmp[1024] = { 0 }; if (fd->pack_length() > 1024) THROW_BZS_ERROR_WITH_MSG("field_length > 1024"); memcpy(tmp, fd->ptr, fd->pack_length()); tb->setValue(fd->field_index, f.value, 0); int v = fd->cmp(tmp, fd->ptr); memcpy(fd->ptr, tmp, fd->pack_length()); if (!match(f.logical, v)) return (f.type == 'F') ? FILTER_SKIP : FILTER_BREAK; } return FILTER_MATCH; } inline int type(request& req) { if (req.op == HS_OP_UPDATE_INC) return UPDATE_INC; if (req.op == HS_OP_UPDATE_DEC) return UPDATE_DEC; return UPDATE_REPLACE; } inline int dbExecuter::readAfter(request& req, engine::mysql::table* tb, resultBuffer& buf, changeFunc func) { int ret = filter(tb, req.table.filters); if (ret == FILTER_MATCH) { if (req.result.offset == 0) { seriarizeRecord(tb, buf, req.result.returnBeforeValue); if (func) func(req, tb, type(req)); return READ_SUCCESS; } --req.result.offset; return FILTER_SKIP; } return ret; } inline void setKeyValues(request& req, engine::mysql::table* tb, int index) { if (req.table.in.keypart >= tb->keyDef(tb->keyNum()).user_defined_key_parts) THROW_BZS_ERROR_WITH_MSG("icol"); tb->clearBuffer(); if (index != -1) tb->setKeyValues(req.table.key.values, req.table.in.keypart, &(req.table.in.values[index])); else tb->setKeyValues(req.table.key.values, -1); } dbExecuter::dbExecuter(netsvc::server::IAppModule* mod) :engine::mysql::dbManager(mod){} void dbExecuter::doRecordOperation(request& req, engine::mysql::table* tb, resultBuffer& buf, changeFunc func) { int count = 0; int curInValueIndex = req.table.in.values.size() ? 0 : -1; int offset = req.result.offset; int filterResult = 0; seriarizeRecordsetBegin(tb, buf, req.result.returnBeforeValue); setKeyValues(req, tb, curInValueIndex); if (curInValueIndex >= 0) ++curInValueIndex; tb->seekKey(req.seekFlag(), tb->keymap()); if (tb->stat() == 0) { filterResult = readAfter(req, tb, buf, func); if (filterResult == READ_SUCCESS) ++count; } if (((tb->stat() == 0) && (filterResult != FILTER_BREAK)) || (curInValueIndex > -1)) { for (int i = 1; i < req.result.limit + offset; i++) { if (curInValueIndex > -1) { if (curInValueIndex >= (int)req.table.in.values.size()) break; setKeyValues(req, tb, curInValueIndex++); tb->seekKey(req.seekFlag(), tb->keymap()); } else { if (req.naviSame()) tb->getNextSame(tb->keymap()); else if (req.naviForward()) tb->getNext(); else tb->getPrev(); } if (tb->stat()) break; filterResult = readAfter(req, tb, buf, func); if (filterResult == READ_SUCCESS) ++count; if (tb->stat() || (filterResult == FILTER_BREAK)) break; } } seriarizeRecordsetEnd(m_tb, buf, count, req.result.returnBeforeValue); } int dbExecuter::commandExec(std::vector& requests, netsvc::server::netWriter* nw) { request& req = requests[0]; resultBuffer buf(nw->ptr()); size_t& size = nw->datalen; try { switch (req.op) { case HS_OP_QUIT: return EXECUTE_RESULT_QUIT; case HS_OP_AUTH: writeStatus(0, buf, 1); // allways success. break; case HS_OP_OPEN: { checkNewHandle(req.handle); bool created; database* db = getDatabase(req.db.name, 0 /*cid*/, created); m_tb = db->openTable(req.table.name, req.table.openMode, NULL, ""); if (m_tb) { addHandle(getDatabaseID(0 /*cid*/), m_tb->id(), req.handle); m_tb = getTable(req.handle); m_tb->setUseFieldList(req.table.fields); m_tb->setKeyNum(req.table.key.name); writeStatus(m_tb->stat(), buf, 1); }else writeStatus(db->stat(), buf, 1); break; } case HS_OP_INSERT: { m_tb = getTable(req.handle, SQLCOM_INSERT); m_tb->clearBuffer(); m_tb->setUseValues(req.table.values, 0); m_tb->insert(true); writeStatus(m_tb->stat(), buf, 1); break; } case HS_OP_UPDATE_INC: case HS_OP_UPDATE_DEC: case HS_OP_UPDATE: { m_tb = getTable(req.handle, SQLCOM_UPDATE); doRecordOperation(req, m_tb, buf, update); break; } case HS_OP_DELETE: { m_tb = getTable(req.handle, SQLCOM_UPDATE); doRecordOperation(req, m_tb, buf, del); break; } case HS_OP_READ: { m_tb = getTable(req.handle); doRecordOperation(req, m_tb, buf, NULL); break; } } if (m_tb) m_tb->unUse(); size = buf.size(); return EXECUTE_RESULT_SUCCESS; } catch (bzs::rtl::exception& e) { clenupNoException(); const std::string* msg = getMsg(e); const int* code = getCode(e); if (code && msg) writeError(*code, buf, msg->c_str()); else { if (msg) writeError(1, buf, msg->c_str()); else writeError(1, buf, ""); sql_print_error("%s", boost::diagnostic_information(e).c_str()); } printWarningMessage(code, msg); } catch (...) { clenupNoException(); writeError(1, buf, "Unknown error."); } size = buf.size(); return EXECUTE_RESULT_SUCCESS; } // --------------------------------------------------------------------------- // class commandExecuter // --------------------------------------------------------------------------- #define PARSEREAD_START 0 #define PARSEREAD_COMMAND 1 #define PARSEREAD_HANDLEID 2 #define PARSEREAD_SEEKCMD 3 #define PARSEREAD_DBNAME 4 #define PARSEREAD_TABLENAME 5 #define PARSEREAD_INDEXNAME 6 #define PARSEREAD_FIELDS 7 #define PARSEREAD_KEYVAL_COUNT 8 #define PARSEREAD_KEYVAL 9 #define PARSEREAD_LIMIT 10 #define PARSEREAD_OFFSET 11 #define PARSEREAD_DATA_COUNT 12 #define PARSEREAD_DATA_VAL 13 #define PARSEREAD_IN_MODE 15 #define PARSEREAD_IN_KEYPART 16 #define PARSEREAD_IN_COUNT 17 #define PARSEREAD_IN_VAL 18 #define PARSEREAD_FL_TYPE 19 #define PARSEREAD_FL_LOG 20 #define PARSEREAD_FL_COL 21 #define PARSEREAD_FL_VAL 22 inline void setHandleID(const std::string& src, int& parseMode, request* req) { req->handle = (short)atol(src.c_str()); if (req->op == HS_OP_OPEN) parseMode = PARSEREAD_DBNAME; else parseMode = PARSEREAD_SEEKCMD; } inline void setCmd(const std::string& src, int& parseMode, request* req) { if (src[0] >= '0' && src[0] <= '9') return setHandleID(src, parseMode, req); else req->op = src[0]; parseMode = PARSEREAD_HANDLEID; } inline void setChangeCmd(const std::string& src, int& parseMode, request* req) { req->op = src[0]; if (req->table.key.logical && (req->op == HS_OP_INSERT)) req->op = HS_OP_UPDATE_INC; req->result.returnBeforeValue = (src[1] == '?'); if (req->op == HS_OP_INSERT) parseMode = PARSEREAD_DATA_COUNT; else parseMode = PARSEREAD_DATA_VAL; } inline void setSeekCmd(const std::string& src, int& parseMode, request* req) { if ((src[0] == HS_OP_INSERT) || (src[0] == HS_OP_UPDATE) || (src[0] == HS_OP_DELETE)) return setChangeCmd(src, parseMode, req); req->table.key.logical = src[0]; if ((src.size() > 1) && (src[1] == '=')) req->table.key.logical += 0xff; req->result.returnBeforeValue = true; parseMode = PARSEREAD_KEYVAL_COUNT; } inline void setDbname(const std::string& src, int& parseMode, request* req) { strncpy(req->db.name, src.c_str(), DEBNAME_SIZE); parseMode = PARSEREAD_TABLENAME; } inline void setTablename(const std::string& src, int& parseMode, request* req) { strncpy(req->table.name, src.c_str(), TABELNAME_SIZE); req->table.openMode = TD_OPEN_NORMAL; parseMode = PARSEREAD_INDEXNAME; } inline void setIndexname(const std::string& src, int& parseMode, request* req) { strncpy(req->table.key.name, src.c_str(), INDEXNAME_SIZE); parseMode = PARSEREAD_FIELDS; } inline void setValueCount(const std::string& src, int& parseMode, int& count) { count = atol(src.c_str()); } inline void setValue(const std::string& src, int& parseMode, std::vector& values, int& count) { if (count) { values.push_back(src); --count; } } inline bool setLimit(const std::string& src, int& parseMode, request* req) { if ((src[0] >= '0') && (src[0] <= '9')) { req->result.limit = atol(src.c_str()); parseMode = PARSEREAD_OFFSET; return false; } parseMode = PARSEREAD_IN_MODE; return true; } inline void setOffset(const std::string& src, int& parseMode, request* req) { req->result.offset = atol(src.c_str()); parseMode = PARSEREAD_IN_MODE; } inline void setFields(const std::string& src, int& parseMode, request* req) { req->table.fields = src; parseMode = PARSEREAD_START; } inline bool setInMode(const std::string& src, int& parseMode, request* req) { if (src[0] == '@') { parseMode = PARSEREAD_IN_KEYPART; return false; } parseMode = PARSEREAD_FL_TYPE; return true; } inline void setInKeyPart(const std::string& src, int& parseMode, request* req) { req->table.in.keypart = (short)atol(src.c_str()); parseMode = PARSEREAD_IN_COUNT; } inline bool setFilterType(const std::string& src, int& parseMode, request* req) { if ((src[0] == 'F') || (src[0] == 'W')) { request::table::filter f; f.type = src[0]; req->table.filters.push_back(f); parseMode = PARSEREAD_FL_LOG; return false; } parseMode = PARSEREAD_SEEKCMD; return true; } inline void setFilterLog(const std::string& src, int& parseMode, request* req) { request::table::filter& f = req->table.filters[req->table.filters.size() - 1]; f.logical = src[0]; if ((src.size() > 1) && (src[1] == '=')) f.logical += 0xff; else if ((src.size() > 1) && (src[1] == '>')) f.logical += 0xfe; parseMode = PARSEREAD_FL_COL; } inline void setFilterCol(const std::string& src, int& parseMode, request* req) { request::table::filter& f = req->table.filters[req->table.filters.size() - 1]; f.col = (short)atol(src.c_str()); parseMode = PARSEREAD_FL_VAL; } inline void setFilterVal(const std::string& src, int& parseMode, request* req) { request::table::filter& f = req->table.filters[req->table.filters.size() - 1]; f.value = src; parseMode = PARSEREAD_FL_TYPE; } commandExecuter::commandExecuter(netsvc::server::IAppModule* mod) : m_dbExec(new dbExecuter(mod)) { } commandExecuter::~commandExecuter() { m_dbExec.reset(); } size_t commandExecuter::perseRequestEnd(const char* p, size_t transfered, bool& comp) const { if (transfered) { const char* tmp = p + transfered - 1; comp = (*tmp == '\n'); } if (!comp && transfered < strlen(p) + 10) return transfered + 64000; return 0; } char* readTorkn(std::string& buf, char* src, const char* end) { buf.erase(); while (!((*src == '\t') || (*src == '\n') || (*src == '\r'))) { if (src == end) break; if (*src == 0x01) *(++src) -= 0x040; buf.append(1, *src); ++src; } return ++src; } bool commandExecuter::parse(const char* ptr, size_t size) { m_requests.clear(); char* p = (char*)ptr; const char* end = p + size; int parseMode = PARSEREAD_START; int keyval_count = 0; int data_count = INT_MAX; int in_count = 0; bool skip = false; std::string buf; buf.reserve(1024); request* req = NULL; while (p != end) { if (skip == false) p = readTorkn(buf, p, end); else skip = false; if (buf.size() == 0) break; switch (parseMode) { case PARSEREAD_START: m_requests.push_back(request()); req = &m_requests[m_requests.size() - 1]; parseMode = PARSEREAD_COMMAND; skip = true; break; case PARSEREAD_COMMAND: setCmd(buf, parseMode, req); break; case PARSEREAD_HANDLEID: setHandleID(buf, parseMode, req); break; case PARSEREAD_SEEKCMD: setSeekCmd(buf, parseMode, req); break; case PARSEREAD_DATA_COUNT: setValueCount(buf, parseMode, data_count); parseMode = PARSEREAD_DATA_VAL; break; case PARSEREAD_DATA_VAL: setValue(buf, parseMode, req->table.values, data_count); if (data_count == 0) parseMode = PARSEREAD_START; break; case PARSEREAD_KEYVAL_COUNT: setValueCount(buf, parseMode, keyval_count); parseMode = PARSEREAD_KEYVAL; break; case PARSEREAD_KEYVAL: setValue(buf, parseMode, req->table.key.values, keyval_count); if (keyval_count == 0) parseMode = PARSEREAD_LIMIT; break; case PARSEREAD_DBNAME: setDbname(buf, parseMode, req); break; case PARSEREAD_TABLENAME: setTablename(buf, parseMode, req); break; case PARSEREAD_INDEXNAME: setIndexname(buf, parseMode, req); break; case PARSEREAD_FIELDS: setFields(buf, parseMode, req); break; case PARSEREAD_LIMIT: skip = setLimit(buf, parseMode, req); break; case PARSEREAD_OFFSET: setOffset(buf, parseMode, req); break; case PARSEREAD_IN_MODE: skip = setInMode(buf, parseMode, req); break; case PARSEREAD_IN_KEYPART: setInKeyPart(buf, parseMode, req); break; case PARSEREAD_IN_COUNT: setValueCount(buf, parseMode, in_count); parseMode = PARSEREAD_IN_VAL; break; case PARSEREAD_IN_VAL: setValue(buf, parseMode, req->table.in.values, in_count); if (in_count == 0) parseMode = PARSEREAD_FL_TYPE; break; case PARSEREAD_FL_TYPE: skip = setFilterType(buf, parseMode, req); break; case PARSEREAD_FL_LOG: setFilterLog(buf, parseMode, req); break; case PARSEREAD_FL_COL: setFilterCol(buf, parseMode, req); parseMode = PARSEREAD_FL_VAL; break; case PARSEREAD_FL_VAL: setFilterVal(buf, parseMode, req); break; } } return !(m_requests.size() == 0); } } // namespace hs } // namespace protocol } // namespace db } // namespace bzs #endif // USE_HANDLERSOCKET