/*================================================================= Copyright (C) 2012 2013 2014 BizStation Corp All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. =================================================================*/ #include "appModule.h" #include "appBuilderImple.h" #include <bzs/db/engine/mysql/dbManager.h> #include <bzs/db/engine/mysql/mysqlThd.h> #include <bzs/db/protocol/tdap/mysql/tdapCommandExecuter.h> #include <bzs/netsvc/server/iserver.h> #include <stdlib.h> #include <boost/asio/ip/address_v4.hpp> #ifdef USE_HANDLERSOCKET #include <bzs/db/protocol/hs/hsCommandExecuter.h> #endif using namespace bzs::netsvc::server; IAppModule* IMyPluginModule::create(const boost::asio::ip::tcp::endpoint& endpoint, iconnection* connection, bool tpool, int type) { return new bzs::db::transactd::module(endpoint, connection, tpool, type); } namespace bzs { namespace netsvc { namespace server { boost::mutex modulesMutex; std::vector<IAppModule*> modules; using namespace bzs::netsvc::server; } // namespace server } // namespace netsvc namespace db { using namespace protocol; namespace transactd { #define ASYNCWRITE_DATA_SIZE 16374 // 16384 - 10 #define ASYNC_BUFFER_SIZE (ASYNCWRITE_DATA_SIZE + 65536 + 6) static const unsigned int segment_mark = 0xFFFFFFFF; /* netAsyncWriter protocol description segment_mark 4 system include RETBUF_EXT_RESERVE_SIZE paramMask 2 system include RETBUF_EXT_RESERVE_SIZE result 2 system include RETBUF_EXT_RESERVE_SIZE rows 2 data data... datalen data datalen 4 system clients are no count this size rows 2 system clients are no count this size totallen 4 system include RETBUF_EXT_RESERVE_SIZE (not include already sent) */ class netAsyncWriter : public netWriter { iconnection* m_conn; char* m_buf; char* m_data; size_t m_defBuffferPos; unsigned short m_rows; inline void doWrite() { unsigned int asyncDataSize = (unsigned int)(m_curPtr - m_data); unsigned int size = ASYNCWRITE_DATA_SIZE; if (asyncDataSize < ASYNCWRITE_DATA_SIZE) size = asyncDataSize; int offset = (int)(m_data - m_buf); *((unsigned short*)(m_data - 2)) = (unsigned short)size; m_conn->asyncWrite(m_buf, size + offset); asyncDataSize -= size; memmove(m_buf + 2, m_data + size, asyncDataSize); m_data = m_buf + 2; m_curPtr = m_data + asyncDataSize; } inline void writeEndMark() { memcpy(m_data, &segment_mark, 4); unsigned short tmp = 4; memcpy(m_data + tmp, &datalen, 4); memcpy(m_data + tmp + 4, &m_rows, 2); *((unsigned short*)(m_data - 2)) = tmp; m_conn->asyncWrite(m_buf, tmp + 2 + 4 + 2); m_curPtr = m_data; } public: netAsyncWriter(iconnection* conn) : netWriter(), m_conn(conn), m_defBuffferPos(0) { m_buf = new char[ASYNC_BUFFER_SIZE]; memcpy(m_buf, &segment_mark, sizeof(unsigned int)); m_data = m_buf + 2 + 4; } ~netAsyncWriter() { delete[] m_buf; } void reset(IResultBuffer* retBuf, buffers* optData) { netWriter::reset(retBuf, optData); m_defBuffferPos = 0; memcpy(m_buf, &segment_mark, sizeof(unsigned int)); m_data = m_buf + 2 + 4; m_curPtr = m_data; } void beginExt(bool includeBlob) { short result = 0; ushort_td paramMask = getParamMask(includeBlob); asyncWrite((const char*)(¶mMask), sizeof(ushort_td)); asyncWrite((const char*)(&result), sizeof(short)); asyncWrite((const char*)(&m_rows), sizeof(unsigned short)); datalen = sizeof(unsigned short); // rows space; m_rows = 0; } bool asyncWrite(const char* p, unsigned int size, eWriteMode mode = copyOnly) { unsigned int asyncDataSize = (unsigned int)(m_curPtr - m_data); // client detabuffer orver flow check. don't use minus unsigned // variables if (m_clientBuffferSize < datalen + size) return false; datalen += size; if (mode == curSeekOnly) m_curPtr += size; else if (mode == netwrite) { if (asyncDataSize > ASYNCWRITE_DATA_SIZE) doWrite(); } else if (mode == writeEnd) { if (asyncDataSize) doWrite(); writeEndMark(); } else { memcpy(m_curPtr, p, size); m_curPtr += size; } return true; } // write to default buffer bool write(const char* p, size_t size, eWriteMode mode = copyOnly) { if (mode >= netwrite) return true; if (resultBuffer->size() < m_defBuffferPos + size) return false; if (mode != curSeekOnly) memcpy(resultBuffer->ptr() + m_defBuffferPos, p, size); m_defBuffferPos += size; datalen += size; m_curPtr = resultBuffer->ptr() + m_defBuffferPos; return true; } void incremetRows() { ++m_rows; } size_t bufferSpace() const { return ASYNC_BUFFER_SIZE - (curPtr() - m_buf); } unsigned int resultLen() const { return (unsigned int)datalen + RETBUF_EXT_RESERVE_SIZE; } unsigned short getParamMask(bool includeBlob) { ushort_td paramMask = P_MASK_DATA | P_MASK_FINALDATALEN | P_MASK_FINALRET; if (!engine::mysql::table::noKeybufResult) paramMask |= P_MASK_KEYBUF; if (includeBlob) paramMask |= P_MASK_BLOBBODY; return paramMask; } /* Increment total deta size space only. The header and contents are already sent This space is include RETBUF_EXT_RESERVE_SIZE. */ void writeHeadar(unsigned short paramMask, short result) { write(NULL, 4, curSeekOnly); datalen -= 4; } /* allreadySent is async write size. writeHeadar size is not include. */ unsigned int allreadySent() const { return resultLen() - 4; } }; class netStdWriter : public netWriter { unsigned short* m_rowsPos; public: netStdWriter() : netWriter() {} void beginExt(bool includeBlob) { m_curPtr = m_ptr + RETBUF_EXT_RESERVE_SIZE; m_rowsPos = (unsigned short*)m_curPtr; (*m_rowsPos) = 0; datalen = sizeof(unsigned short); // rows space; m_curPtr += 2; } unsigned int resultLen() const { return (unsigned int)(datalen + RETBUF_EXT_RESERVE_SIZE); } bool asyncWrite(const char* p, unsigned int size, eWriteMode mode = copyOnly) { if (mode >= netwrite) return true; if (m_clientBuffferSize < datalen + size) return false; if (mode != curSeekOnly) memcpy(m_curPtr, p, size); m_curPtr += size; datalen += size; return true; } bool write(const char* p, size_t size, eWriteMode mode = copyOnly) { return asyncWrite(p, (unsigned int)size, mode); } void incremetRows() { ++(*m_rowsPos); } unsigned short getParamMask(bool includeBlob) { ushort_td paramMask = (engine::mysql::table::noKeybufResult == false) ? P_MASK_READ_EXT : P_MASK_DATA | P_MASK_DATALEN; if (includeBlob) paramMask |= P_MASK_BLOBBODY; return paramMask; } void writeHeadar(unsigned short paramMask, short result) { char* p = ptr() + sizeof(unsigned int); // 4 memcpy(p, (const char*)(¶mMask), sizeof(ushort_td)); // 2 p += sizeof(ushort_td); memcpy(p, (const char*)(&result), sizeof(short_td)); // 2 p += sizeof(short_td); memcpy(p, (const char*)&datalen, sizeof(uint_td)); // 4 } unsigned int allreadySent() const { return 0; } }; /** The module created for every connection * In the case of a thread pool, thread termination processing is not performed * by a destructor. */ module::module(const boost::asio::ip::tcp::endpoint& endpoint, iconnection* connection, bool tpool, int type) : m_endpoint(endpoint), m_connection(connection), m_useThreadPool(tpool) { if (type & PROTOCOL_TYPE_BTRV) m_commandExecuter.reset(new protocol::tdap::mysql::commandExecuter( (unsigned __int64) this)); #ifdef USE_HANDLERSOCKET else if (type & PROTOCOL_TYPE_HS) m_commandExecuter.reset( new protocol::hs::commandExecuter((unsigned __int64) this)); #endif boost::mutex::scoped_lock lck(modulesMutex); modules.push_back(this); if (type & PROTOCOL_TYPE_ASYNCWRITE) m_nw = new netAsyncWriter(connection); else m_nw = new netStdWriter(); } module::~module(void) { boost::mutex::scoped_lock lck(modulesMutex); modules.erase(find(modules.begin(), modules.end(), this)); delete m_nw; m_commandExecuter.reset(); if (m_useThreadPool == false) { my_thread_end(); endThread(); } } void module::reset() { } /** It is called from the handler of async_read. * A value is analyzed and it is answered whether a lead is completion. * @return Size required for a lead buffer is returned on the occasion of *reading by addition. * When zero are returned, it is shown that it is not necessary to *enlarge a buffer further. */ size_t module::onRead(const char* data, size_t size, bool& complete) { m_readBuf = data; m_readSize = size; return m_commandExecuter->perseRequestEnd(data, size, complete); } size_t module::onAccept(char* message, size_t bufsize) { return m_commandExecuter->getAcceptMessage(message, bufsize); } static const char* addressMasks[3] = { ".0.0.0/255.0.0.0", ".0.0/255.255.0.0", ".0/255.255.255.0" }; char* addressClass(char* buf, int bufsize, const char* host, int type) { strcpy_s(buf, bufsize, host); for (int i = 3; i >= type; i--) { char* p = strrchr(buf, '.'); if (p == NULL) return buf; *p = 0x00; } strcat_s(buf, bufsize, addressMasks[type - 1]); return buf; } bool isAclUser(const char* host, const char* user) { bool ret = is_acl_user(host, user); if (ret) return true; for (int i = 1; i <= 3; i++) { char buf[256]; ret = is_acl_user(addressClass(buf, 256, host, i), user); if (ret) return true; } return false; } bool module::checkHost(const char* hostCheckname) { std::string addr = m_endpoint.address().to_string(); size_t pos = addr.find_last_of(":"); if (pos != std::string::npos) addr = addr.substr(pos + 1); bool ret = true; if (!isAclUser(addr.c_str(), hostCheckname)) { ret = isAclUser(m_endpoint.address().to_string().c_str(), hostCheckname); if (!ret && m_endpoint.address().is_v4()) { if (addr == std::string("127.0.0.1")) ret = isAclUser("localhost", hostCheckname); } } return ret; } int module::execute(netsvc::server::IResultBuffer& result, size_t& size, netsvc::server::buffers* optionalData) { m_commandExecuter->parse(m_readBuf, m_readSize); m_nw->reset(&result, optionalData); boost::mutex::scoped_lock lck(m_mutex); int ret = m_commandExecuter->execute(m_nw); if (m_useThreadPool) cleanup(); size = m_nw->datalen; return ret; } void module::disconnect() { m_connection->close(); } } // namespace transactd } // namespace db } // namespace bzs